[FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side output.

With this, the CEP library assumes correctness of the watermark
and considers as late, events that arrive having a timestamp
smaller than that of the last seen watermark. Late events are not
silently dropped, but the user can specify to send them to a side
output.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48890285
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48890285
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48890285

Branch: refs/heads/table-retraction
Commit: 48890285d4b1c285bebb971ae0dbfc310c6fcc0e
Parents: 1932240
Author: kl0u <kklou...@gmail.com>
Authored: Thu Mar 23 19:01:15 2017 +0100
Committer: kl0u <kklou...@gmail.com>
Committed: Fri Mar 31 11:16:50 2017 +0200

----------------------------------------------------------------------
 docs/dev/libs/cep.md                            |  53 ++++++++++
 .../apache/flink/cep/scala/PatternStream.scala  |  37 ++++++-
 .../org/apache/flink/cep/PatternStream.java     |  72 +++++++++++--
 .../AbstractKeyedCEPPatternOperator.java        |  69 ++++++++++---
 .../flink/cep/operator/CEPOperatorUtils.java    |  17 ++-
 .../cep/operator/KeyedCEPPatternOperator.java   |   4 +-
 .../TimeoutKeyedCEPPatternOperator.java         |   4 +-
 .../java/org/apache/flink/cep/CEPITCase.java    | 103 +++++++++++++++++++
 .../cep/operator/CEPMigration11to13Test.java    |   2 +
 .../cep/operator/CEPMigration12to13Test.java    |   3 +
 .../flink/cep/operator/CEPOperatorTest.java     |   2 +
 .../flink/cep/operator/CEPRescalingTest.java    |   1 +
 12 files changed, 333 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/docs/dev/libs/cep.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md
index bb704c7..643d6ee 100644
--- a/docs/dev/libs/cep.md
+++ b/docs/dev/libs/cep.md
@@ -777,6 +777,59 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = 
patternStream.flatSelect
 </div>
 </div>
 
+### Handling Lateness in Event Time
+
+In `CEP` the order in which elements are processed matters. To guarantee that 
elements are processed in the correct order
+when working in event time, an incoming element is initially put in a buffer 
where elements are *sorted in ascending 
+order based on their timestamp*, and when a watermark arrives, all the 
elements in this buffer with timestamps smaller 
+than that of the watermark are processed. This implies that elements between 
watermarks are processed in event-time order. 
+
+<span class="label label-danger">Attention</span> The library assumes 
correctness of the watermark when working 
+in event time.
+
+To also guarantee that elements across watermarks are processed in event-time 
order, Flink's CEP library assumes 
+*correctness of the watermark*, and considers as *late* elements whose 
timestamp is smaller than that of the last 
+seen watermark. Late elements are not further processed but they can be 
redirected to a [side output]
+({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them.
+
+To access the stream of late elements, you first need to specify that you want 
to get the late data using 
+`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the 
`CEP.pattern(...)` call. If you do not do
+so, the late elements will be silently dropped. Then, you can get the 
side-output stream using the 
+`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and 
providing as argument the output tag used in 
+the `.withLateDataOutputTag(OutputTag)`:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){};
+
+PatternStream<T> patternStream = CEP.pattern(...)
+    .withLateDataOutputTag(lateOutputTag);
+
+// main output with matches
+DataStream<O> result = patternStream.select(...)    
+
+// side output containing the late events
+DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag);
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+val lateOutputTag = OutputTag[T]("late-data")
+
+val patternStream: PatternStream[T] = CEP.pattern(...)
+    .withLateDataOutputTag(lateOutputTag)
+
+// main output with matches
+val result = patternStream.select(...)
+
+// side output containing the late events
+val lateStream = patternStream.getSideOutput(lateOutputTag)
+{% endhighlight %}
+</div>
+</div>
+
 ## Examples
 
 The following example detects the pattern `start, middle(name = "error") -> 
end(name = "critical")` on a keyed data stream of `Events`.

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
index 6207049..fb09c15 100644
--- 
a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
+++ 
b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala
@@ -22,12 +22,13 @@ import java.util.{Map => JMap}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.cep.{PatternFlatSelectFunction, 
PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, 
PatternStream => JPatternStream}
 import org.apache.flink.cep.pattern.{Pattern => JPattern}
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
+import org.apache.flink.streaming.api.scala.{asScalaStream, _}
+import org.apache.flink.util.{Collector, OutputTag}
 import org.apache.flink.types.{Either => FEither}
 import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2}
 import java.lang.{Long => JLong}
 
+import org.apache.flink.annotation.PublicEvolving
 import org.apache.flink.cep.operator.CEPOperatorUtils
 import org.apache.flink.cep.scala.pattern.Pattern
 
@@ -45,8 +46,23 @@ import scala.collection.mutable
   */
 class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
+  private[flink] var lateDataOutputTag: OutputTag[T] = null
+
   private[flink] def wrappedPatternStream = jPatternStream
 
+
+  /**
+    * Send late arriving data to the side output identified by the given 
{@link OutputTag}. The
+    * CEP library assumes correctness of the watermark, so an element is 
considered late if its
+    * timestamp is smaller than the last received watermark.
+    */
+  @PublicEvolving
+  def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = {
+    jPatternStream.withLateDataOutputTag(outputTag)
+    lateDataOutputTag = outputTag
+    this
+  }
+
   def getPattern: Pattern[T, T] = 
Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]])
 
   def getInputStream: DataStream[T] = 
asScalaStream(jPatternStream.getInputStream())
@@ -93,7 +109,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern())
+      jPatternStream.getPattern(),
+      lateDataOutputTag)
 
     val cleanedSelect = cleanClosure(patternSelectFunction)
     val cleanedTimeout = cleanClosure(patternTimeoutFunction)
@@ -158,7 +175,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
   : DataStream[Either[L, R]] = {
     val patternStream = CEPOperatorUtils.createTimeoutPatternStream(
       jPatternStream.getInputStream(),
-      jPatternStream.getPattern()
+      jPatternStream.getPattern(),
+      lateDataOutputTag
     )
 
     val cleanedSelect = cleanClosure(patternFlatSelectFunction)
@@ -317,6 +335,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) {
 
     flatSelect(patternFlatTimeoutFun, patternFlatSelectFun)
   }
+
+  /**
+    * Gets the {@link DataStream} that contains the elements that are emitted 
from an operation
+    * into the side output with the given {@link OutputTag}.
+    *
+    * @param tag The tag identifying a specific side output.
+    */
+    @PublicEvolving
+    def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = {
+      asScalaStream(jPatternStream.getSideOutput(tag))
+    }
 }
 
 object PatternStream {

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
index efcd16c..87666a5 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java
@@ -27,8 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.cep.operator.CEPOperatorUtils;
 import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Map;
 
@@ -50,6 +53,19 @@ public class PatternStream<T> {
 
        private final Pattern<T, ?> pattern;
 
+       /**
+        * A reference to the created pattern stream used to get
+        * the registered side outputs, e.g late elements side output.
+        */
+       private SingleOutputStreamOperator<?> patternStream;
+
+       /**
+        * {@link OutputTag} to use for late arriving events. Elements for which
+        * {@code window.maxTimestamp + allowedLateness} is smaller than the 
current watermark will
+        * be emitted to this.
+        */
+       private OutputTag<T> lateDataOutputTag;
+
        PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> 
pattern) {
                this.inputStream = inputStream;
                this.pattern = pattern;
@@ -64,6 +80,22 @@ public class PatternStream<T> {
        }
 
        /**
+        * Send late arriving data to the side output identified by the given 
{@link OutputTag}. The
+        * CEP library assumes correctness of the watermark, so an element is 
considered late if its
+        * timestamp is smaller than the last received watermark.
+        */
+       public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) {
+               Preconditions.checkNotNull(outputTag, "Side output tag must not 
be null.");
+               Preconditions.checkArgument(lateDataOutputTag == null,
+                               "The late side output tag has already been 
initialized to " + lateDataOutputTag + ".");
+               Preconditions.checkArgument(patternStream == null,
+                               "The late side output tag has to be set before 
calling select() or flatSelect().");
+
+               this.lateDataOutputTag = 
inputStream.getExecutionEnvironment().clean(outputTag);
+               return this;
+       }
+
+       /**
         * Applies a select function to the detected pattern sequence. For each 
pattern sequence the
         * provided {@link PatternSelectFunction} is called. The pattern select 
function can produce
         * exactly one resulting element.
@@ -74,7 +106,7 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements 
from the pattern select
         *         function.
         */
-       public <R> DataStream<R> select(final PatternSelectFunction<T, R> 
patternSelectFunction) {
+       public <R> SingleOutputStreamOperator<R> select(final 
PatternSelectFunction<T, R> patternSelectFunction) {
                // we have to extract the output type from the provided pattern 
selection function manually
                // because the TypeExtractor cannot do that if the method is 
wrapped in a MapFunction
 
@@ -102,8 +134,10 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements 
from the pattern select
         *         function.
         */
-       public <R> DataStream<R> select(final PatternSelectFunction<T, R> 
patternSelectFunction, TypeInformation<R> outTypeInfo) {
-               DataStream<Map<String, T>> patternStream = 
CEPOperatorUtils.createPatternStream(inputStream, pattern);
+       public <R> SingleOutputStreamOperator<R> select(final 
PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> 
outTypeInfo) {
+               SingleOutputStreamOperator<Map<String, T>> patternStream =
+                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+               this.patternStream = patternStream;
 
                return patternStream.map(
                        new PatternSelectMapper<>(
@@ -129,11 +163,13 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements or 
the resulting timeout
         * elements wrapped in an {@link Either} type.
         */
-       public <L, R> DataStream<Either<L, R>> select(
+       public <L, R> SingleOutputStreamOperator<Either<L, R>> select(
                final PatternTimeoutFunction<T, L> patternTimeoutFunction,
                final PatternSelectFunction<T, R> patternSelectFunction) {
 
-               DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, 
T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, 
pattern);
+               SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, 
Map<String, T>>> patternStream =
+                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, 
lateDataOutputTag);
+               this.patternStream = patternStream;
 
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternTimeoutFunction,
@@ -174,7 +210,7 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
         *         function.
         */
-       public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, 
R> patternFlatSelectFunction) {
+       public <R> SingleOutputStreamOperator<R> flatSelect(final 
PatternFlatSelectFunction<T, R> patternFlatSelectFunction) {
                // we have to extract the output type from the provided pattern 
selection function manually
                // because the TypeExtractor cannot do that if the method is 
wrapped in a MapFunction
                TypeInformation<R> outTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
@@ -201,8 +237,10 @@ public class PatternStream<T> {
         * @return {@link DataStream} which contains the resulting elements 
from the pattern flat select
         *         function.
         */
-       public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, 
R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) {
-               DataStream<Map<String, T>> patternStream = 
CEPOperatorUtils.createPatternStream(inputStream, pattern);
+       public <R> SingleOutputStreamOperator<R> flatSelect(final 
PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> 
outTypeInfo) {
+               SingleOutputStreamOperator<Map<String, T>> patternStream =
+                               
CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag);
+               this.patternStream = patternStream;
 
                return patternStream.flatMap(
                        new PatternFlatSelectMapper<>(
@@ -229,11 +267,13 @@ public class PatternStream<T> {
         * function or the resulting timeout events from the pattern flat 
timeout function wrapped in an
         * {@link Either} type.
         */
-       public <L, R> DataStream<Either<L, R>> flatSelect(
+       public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect(
                final PatternFlatTimeoutFunction<T, L> 
patternFlatTimeoutFunction,
                final PatternFlatSelectFunction<T, R> 
patternFlatSelectFunction) {
 
-               DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, 
T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, 
pattern);
+               SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, 
Map<String, T>>> patternStream =
+                               
CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, 
lateDataOutputTag);
+               this.patternStream = patternStream;
 
                TypeInformation<L> leftTypeInfo = 
TypeExtractor.getUnaryOperatorReturnType(
                        patternFlatTimeoutFunction,
@@ -264,6 +304,18 @@ public class PatternStream<T> {
        }
 
        /**
+        * Gets the {@link DataStream} that contains the elements that are 
emitted from an operation
+        * into the side output with the given {@link OutputTag}.
+        *
+        * @param sideOutputTag The tag identifying a specific side output.
+        */
+       public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) {
+               Preconditions.checkNotNull(patternStream, "The operator has not 
been initialized. " +
+                               "To have the late element side output, you have 
to first define the main output using select() or flatSelect().");
+               return patternStream.getSideOutput(sideOutputTag);
+       }
+
+       /**
         * Wrapper for a {@link PatternSelectFunction}.
         *
         * @param <T> Type of the input elements

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
index 3e18660..b232dbb 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java
@@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 import org.apache.flink.util.Preconditions;
 
 import java.io.IOException;
@@ -89,6 +90,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
        private final NFACompiler.NFAFactory<IN> nfaFactory;
 
        /**
+        * {@link OutputTag} to use for late arriving events. Elements for which
+        * {@code window.maxTimestamp + allowedLateness} is smaller than the 
current watermark will
+        * be emitted to this.
+        */
+       private final OutputTag<IN> lateDataOutputTag;
+
+       /**
+        * The last seen watermark. This will be used to
+        * decide if an incoming element is late or not.
+        */
+       private long lastWatermark;
+
+       /**
         * A flag used in the case of migration that indicates if
         * we are restoring from an old keyed or non-keyed operator.
         */
@@ -100,6 +114,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                        final KeySelector<IN, KEY> keySelector,
                        final TypeSerializer<KEY> keySerializer,
                        final NFACompiler.NFAFactory<IN> nfaFactory,
+                       final OutputTag<IN> lateDataOutputTag,
                        final boolean migratingFromOldKeyedOperator) {
 
                this.inputSerializer = 
Preconditions.checkNotNull(inputSerializer);
@@ -107,11 +122,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                this.keySelector = Preconditions.checkNotNull(keySelector);
                this.keySerializer = Preconditions.checkNotNull(keySerializer);
                this.nfaFactory = Preconditions.checkNotNull(nfaFactory);
-               this.migratingFromOldKeyedOperator = 
migratingFromOldKeyedOperator;
-       }
 
-       public TypeSerializer<IN> getInputSerializer() {
-               return inputSerializer;
+               this.lateDataOutputTag = lateDataOutputTag;
+               this.migratingFromOldKeyedOperator = 
migratingFromOldKeyedOperator;
        }
 
        @Override
@@ -159,6 +172,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                        // 3) advance the time to the current 
watermark, so that expired patterns are discarded.
                                        // 4) update the stored state for the 
key, by only storing the new NFA and priority queue iff they
                                        //              have state to be used 
later.
+                                       // 5) update the last seen watermark.
 
                                        // STEP 1
                                        PriorityQueue<StreamRecord<IN>> 
priorityQueue = getPriorityQueue();
@@ -180,6 +194,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                                        if (priorityQueue.isEmpty() && 
nfa.isEmpty()) {
                                                
watermarkCallbackService.unregisterKeyFromWatermarkCallback(key);
                                        }
+
+                                       // STEP 5
+                                       updateLastSeenWatermark(watermark);
                                }
                        },
                        keySerializer
@@ -196,19 +213,45 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, 
KEY, OUT>
                        updateNFA(nfa);
 
                } else {
-                       
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
 
-                       PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+                       // In event-time processing we assume correctness of 
the watermark.
+                       // Events with timestamp smaller than the last seen 
watermark are considered late.
+                       // Late events are put in a dedicated side output, if 
the user has specified one.
+
+                       if (element.getTimestamp() >= lastWatermark) {
 
-                       // event time processing
-                       // we have to buffer the elements until we receive the 
proper watermark
-                       if (getExecutionConfig().isObjectReuseEnabled()) {
-                               // copy the StreamRecord so that it cannot be 
changed
-                               priorityQueue.offer(new 
StreamRecord<IN>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
+                               // we have an event with a valid timestamp, so
+                               // we buffer it until we receive the proper 
watermark.
+
+                               
getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue()));
+
+                               PriorityQueue<StreamRecord<IN>> priorityQueue = 
getPriorityQueue();
+                               if 
(getExecutionConfig().isObjectReuseEnabled()) {
+                                       // copy the StreamRecord so that it 
cannot be changed
+                                       priorityQueue.offer(new 
StreamRecord<>(inputSerializer.copy(element.getValue()), 
element.getTimestamp()));
+                               } else {
+                                       priorityQueue.offer(element);
+                               }
+                               updatePriorityQueue(priorityQueue);
                        } else {
-                               priorityQueue.offer(element);
+                               sideOutputLateElement(element);
                        }
-                       updatePriorityQueue(priorityQueue);
+               }
+       }
+
+       private void updateLastSeenWatermark(Watermark watermark) {
+               this.lastWatermark = watermark.getTimestamp();
+       }
+
+       /**
+        * Puts the provided late element in the dedicated side output,
+        * if the user has specified one.
+        *
+        * @param element The late element.
+        */
+       private void sideOutputLateElement(StreamRecord<IN> element) {
+               if (lateDataOutputTag != null) {
+                       output.collect(lateDataOutputTag, element);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
index a5eef45..c12680f 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java
@@ -33,7 +33,9 @@ import org.apache.flink.cep.pattern.Pattern;
 import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Map;
 
@@ -46,7 +48,7 @@ public class CEPOperatorUtils {
         * @return Data stream containing fully matched event sequences stored 
in a {@link Map}. The
         * events are indexed by their associated names of the pattern.
         */
-       public static <K, T> DataStream<Map<String, T>> 
createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) {
+       public static <K, T> SingleOutputStreamOperator<Map<String, T>> 
createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, 
OutputTag<T> lateDataOutputTag) {
                final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
                // check whether we use processing time
@@ -55,7 +57,7 @@ public class CEPOperatorUtils {
                // compile our pattern into a NFAFactory to instantiate NFAs 
later on
                final NFACompiler.NFAFactory<T> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer, false);
 
-               final DataStream<Map<String, T>> patternStream;
+               final SingleOutputStreamOperator<Map<String, T>> patternStream;
 
                if (inputStream instanceof KeyedStream) {
                        // We have to use the KeyedCEPPatternOperator which can 
deal with keyed input streams
@@ -73,6 +75,7 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
+                                       lateDataOutputTag,
                                        true));
                } else {
 
@@ -88,6 +91,7 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
+                                       lateDataOutputTag,
                                        false
                                )).forceNonParallel();
                }
@@ -104,7 +108,8 @@ public class CEPOperatorUtils {
         * @return Data stream containing fully matched and partially matched 
event sequences wrapped in
         * a {@link Either} instance.
         */
-       public static <K, T> DataStream<Either<Tuple2<Map<String, T>, Long>, 
Map<String, T>>> createTimeoutPatternStream(DataStream<T> inputStream, 
Pattern<T, ?> pattern) {
+       public static <K, T> 
SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, 
T>>> createTimeoutPatternStream(
+                       DataStream<T> inputStream, Pattern<T, ?> pattern, 
OutputTag<T> lateDataOutputTag) {
 
                final TypeSerializer<T> inputSerializer = 
inputStream.getType().createSerializer(inputStream.getExecutionConfig());
 
@@ -114,7 +119,7 @@ public class CEPOperatorUtils {
                // compile our pattern into a NFAFactory to instantiate NFAs 
later on
                final NFACompiler.NFAFactory<T> nfaFactory = 
NFACompiler.compileFactory(pattern, inputSerializer, true);
 
-               final DataStream<Either<Tuple2<Map<String, T>, Long>, 
Map<String, T>>> patternStream;
+               final SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, 
Long>, Map<String, T>>> patternStream;
 
                final TypeInformation<Map<String, T>> rightTypeInfo = 
(TypeInformation<Map<String, T>>) (TypeInformation<?>)  
TypeExtractor.getForClass(Map.class);
                final TypeInformation<Tuple2<Map<String, T>, Long>> 
leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO);
@@ -130,12 +135,13 @@ public class CEPOperatorUtils {
                        patternStream = keyedStream.transform(
                                "TimeoutKeyedCEPPatternOperator",
                                eitherTypeInformation,
-                               new TimeoutKeyedCEPPatternOperator<T, K>(
+                               new TimeoutKeyedCEPPatternOperator<>(
                                        inputSerializer,
                                        isProcessingTime,
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
+                                       lateDataOutputTag,
                                        true));
                } else {
 
@@ -151,6 +157,7 @@ public class CEPOperatorUtils {
                                        keySelector,
                                        keySerializer,
                                        nfaFactory,
+                                       lateDataOutputTag,
                                        false
                                )).forceNonParallel();
                }

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
index 21cee23..532bba3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Iterator;
@@ -46,9 +47,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPatternOpe
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
+                       OutputTag<IN> lateDataOutputTag,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
index c6fba55..933bfd3 100644
--- 
a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
+++ 
b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java
@@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA;
 import org.apache.flink.cep.nfa.compiler.NFACompiler;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 
 import java.util.Collection;
 import java.util.Map;
@@ -46,9 +47,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends 
AbstractKeyedCEPPat
                        KeySelector<IN, KEY> keySelector,
                        TypeSerializer<KEY> keySerializer,
                        NFACompiler.NFAFactory<IN> nfaFactory,
+                       OutputTag<IN> lateDataOutputTag,
                        boolean migratingFromOldKeyedOperator) {
 
-               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, migratingFromOldKeyedOperator);
+               super(inputSerializer, isProcessingTime, keySelector, 
keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator);
        }
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
index 42117ee..a5015df 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java
@@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.time.Time;
 import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase;
 
 import org.apache.flink.types.Either;
+import org.apache.flink.util.OutputTag;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -47,6 +48,9 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
        private String resultPath;
        private String expected;
 
+       private String lateEventPath;
+       private String expectedLateEvents;
+
        @Rule
        public TemporaryFolder tempFolder = new TemporaryFolder();
 
@@ -54,11 +58,15 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
        public void before() throws Exception {
                resultPath = tempFolder.newFile().toURI().toString();
                expected = "";
+
+               lateEventPath = tempFolder.newFile().toURI().toString();
+               expectedLateEvents = "";
        }
 
        @After
        public void after() throws Exception {
                compareResultsByLinesInMemory(expected, resultPath);
+               compareResultsByLinesInMemory(expectedLateEvents, 
lateEventPath);
        }
 
        /**
@@ -572,4 +580,99 @@ public class CEPITCase extends 
StreamingMultipleProgramsTestBase {
 
                env.execute();
        }
+
+       @Test
+       public void testLateEventSideOutput() throws Exception {
+               StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+               env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
+               env.setParallelism(1);
+
+               // (Event, timestamp)
+               DataStream<Event> input = env.fromElements(
+                               Tuple2.of(new Event(1, "start", 1.0), 1L),
+                               Tuple2.of(new Event(2, "middle", 2.0), 2L),
+                               Tuple2.of(new Event(3, "end", 3.0), 15L),
+                               Tuple2.of(new Event(4, "middle", 5.0), 7L),
+                               Tuple2.of(new Event(6, "start", 1.0), 21L),
+                               Tuple2.of(new Event(5, "middle", 5.0), 10L),
+                               Tuple2.of(new Event(7, "middle", 2.0), 22L),
+                               Tuple2.of(new Event(8, "end", 3.0), 23L)
+               ).assignTimestampsAndWatermarks(new 
AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() {
+
+                       @Override
+                       public long extractTimestamp(Tuple2<Event, Long> 
element, long previousTimestamp) {
+                               return element.f1;
+                       }
+
+                       @Override
+                       public Watermark checkAndGetNextWatermark(Tuple2<Event, 
Long> lastElement, long extractedTimestamp) {
+                               return lastElement.f0.getName().equals("end") ? 
new Watermark(extractedTimestamp) : null;
+                       }
+
+               }).map(new MapFunction<Tuple2<Event, Long>, Event>() {
+
+                       @Override
+                       public Event map(Tuple2<Event, Long> value) throws 
Exception {
+                               return value.f0;
+                       }
+               });
+
+               Pattern<Event, ?> pattern = 
Pattern.<Event>begin("start").where(new SimpleCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("start");
+                       }
+               }).followedBy("middle").where(new SimpleCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("middle");
+                       }
+               }).followedBy("end").where(new SimpleCondition<Event>() {
+
+                       @Override
+                       public boolean filter(Event value) throws Exception {
+                               return value.getName().equals("end");
+                       }
+               });
+
+               final OutputTag<Event> lateOutputTag = new 
OutputTag<Event>("late-data"){};
+
+               PatternStream<Event> patternStream = CEP.pattern(input, 
pattern).withLateDataOutputTag(lateOutputTag);
+               DataStream<String> result = patternStream.select(
+                               new PatternSelectFunction<Event, String>() {
+
+                                       @Override
+                                       public String select(Map<String, Event> 
pattern) {
+                                               StringBuilder builder = new 
StringBuilder();
+
+                                               
builder.append(pattern.get("start").getId()).append(",")
+                                                               
.append(pattern.get("middle").getId()).append(",")
+                                                               
.append(pattern.get("end").getId());
+                                               return builder.toString();
+                                       }
+                               }
+               );
+
+               DataStream<Event> lateEvents = 
patternStream.getSideOutput(lateOutputTag);
+
+               // we just care for the late events in this test.
+               lateEvents.map(
+                               new MapFunction<Event, Integer>() {
+
+                                       @Override
+                                       public Integer map(Event value) throws 
Exception {
+                                               return value.getId();
+                                       }
+                               }
+               ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE);
+
+               // the expected sequence of late event ids
+               expectedLateEvents = "4\n5";
+
+               result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE);
+               expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8";
+               env.execute();
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
index b83eb3c..4e05fcf 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java
@@ -103,6 +103,7 @@ public class CEPMigration11to13Test {
                                                                keySelector,
                                                                
IntSerializer.INSTANCE,
                                                                new 
NFAFactory(),
+                                                               null,
                                                                true),
                                                keySelector,
                                                BasicTypeInfo.INT_TYPE_INFO);
@@ -177,6 +178,7 @@ public class CEPMigration11to13Test {
                                                                keySelector,
                                                                
ByteSerializer.INSTANCE,
                                                                new 
NFAFactory(),
+                                                               null,
                                                                false),
                                                keySelector,
                                                BasicTypeInfo.BYTE_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
index dbe4230..8249535 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java
@@ -118,6 +118,7 @@ public class CEPMigration12to13Test {
                                        keySelector,
                                        IntSerializer.INSTANCE,
                                        new NFAFactory(),
+                                       null,
                                        true),
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
@@ -233,6 +234,7 @@ public class CEPMigration12to13Test {
                                        keySelector,
                                        IntSerializer.INSTANCE,
                                        new NFAFactory(),
+                                       null,
                                        true),
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);
@@ -353,6 +355,7 @@ public class CEPMigration12to13Test {
                                        keySelector,
                                        IntSerializer.INSTANCE,
                                        new SinglePatternNFAFactory(),
+                                       null,
                                        true),
                                keySelector,
                                BasicTypeInfo.INT_TYPE_INFO);

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
index 726c8b8..d599ec9 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java
@@ -225,6 +225,7 @@ public class CEPOperatorTest extends TestLogger {
                                keySelector,
                                IntSerializer.INSTANCE,
                                new NFAFactory(true),
+                               null,
                                true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO);
@@ -482,6 +483,7 @@ public class CEPOperatorTest extends TestLogger {
                        keySelector,
                        IntSerializer.INSTANCE,
                        new NFAFactory(),
+                       null,
                        true);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
----------------------------------------------------------------------
diff --git 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
index 2c86648..a048183 100644
--- 
a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
+++ 
b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java
@@ -346,6 +346,7 @@ public class CEPRescalingTest {
                                keySelector,
                                
BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()),
                                new NFAFactory(),
+                               null,
                                true),
                        keySelector,
                        BasicTypeInfo.INT_TYPE_INFO,

Reply via email to