[FLINK-6205] [FLINK-6069] [cep] Correct watermark/late events in side output.
With this, the CEP library assumes correctness of the watermark and considers as late, events that arrive having a timestamp smaller than that of the last seen watermark. Late events are not silently dropped, but the user can specify to send them to a side output. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48890285 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48890285 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48890285 Branch: refs/heads/table-retraction Commit: 48890285d4b1c285bebb971ae0dbfc310c6fcc0e Parents: 1932240 Author: kl0u <kklou...@gmail.com> Authored: Thu Mar 23 19:01:15 2017 +0100 Committer: kl0u <kklou...@gmail.com> Committed: Fri Mar 31 11:16:50 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 53 ++++++++++ .../apache/flink/cep/scala/PatternStream.scala | 37 ++++++- .../org/apache/flink/cep/PatternStream.java | 72 +++++++++++-- .../AbstractKeyedCEPPatternOperator.java | 69 ++++++++++--- .../flink/cep/operator/CEPOperatorUtils.java | 17 ++- .../cep/operator/KeyedCEPPatternOperator.java | 4 +- .../TimeoutKeyedCEPPatternOperator.java | 4 +- .../java/org/apache/flink/cep/CEPITCase.java | 103 +++++++++++++++++++ .../cep/operator/CEPMigration11to13Test.java | 2 + .../cep/operator/CEPMigration12to13Test.java | 3 + .../flink/cep/operator/CEPOperatorTest.java | 2 + .../flink/cep/operator/CEPRescalingTest.java | 1 + 12 files changed, 333 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index bb704c7..643d6ee 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -777,6 +777,59 @@ DataStream[Either[TimeoutEvent, ComplexEvent]] result = patternStream.flatSelect </div> </div> +### Handling Lateness in Event Time + +In `CEP` the order in which elements are processed matters. To guarantee that elements are processed in the correct order +when working in event time, an incoming element is initially put in a buffer where elements are *sorted in ascending +order based on their timestamp*, and when a watermark arrives, all the elements in this buffer with timestamps smaller +than that of the watermark are processed. This implies that elements between watermarks are processed in event-time order. + +<span class="label label-danger">Attention</span> The library assumes correctness of the watermark when working +in event time. + +To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes +*correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last +seen watermark. Late elements are not further processed but they can be redirected to a [side output] +({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them. + +To access the stream of late elements, you first need to specify that you want to get the late data using +`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do +so, the late elements will be silently dropped. Then, you can get the side-output stream using the +`.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in +the `.withLateDataOutputTag(OutputTag)`: + +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> +{% highlight java %} +final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; + +PatternStream<T> patternStream = CEP.pattern(...) + .withLateDataOutputTag(lateOutputTag); + +// main output with matches +DataStream<O> result = patternStream.select(...) + +// side output containing the late events +DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag); +{% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} +val lateOutputTag = OutputTag[T]("late-data") + +val patternStream: PatternStream[T] = CEP.pattern(...) + .withLateDataOutputTag(lateOutputTag) + +// main output with matches +val result = patternStream.select(...) + +// side output containing the late events +val lateStream = patternStream.getSideOutput(lateOutputTag) +{% endhighlight %} +</div> +</div> + ## Examples The following example detects the pattern `start, middle(name = "error") -> end(name = "critical")` on a keyed data stream of `Events`. http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index 6207049..fb09c15 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -22,12 +22,13 @@ import java.util.{Map => JMap} import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.cep.{PatternFlatSelectFunction, PatternFlatTimeoutFunction, PatternSelectFunction, PatternTimeoutFunction, PatternStream => JPatternStream} import org.apache.flink.cep.pattern.{Pattern => JPattern} -import org.apache.flink.streaming.api.scala._ -import org.apache.flink.util.Collector +import org.apache.flink.streaming.api.scala.{asScalaStream, _} +import org.apache.flink.util.{Collector, OutputTag} import org.apache.flink.types.{Either => FEither} import org.apache.flink.api.java.tuple.{Tuple2 => FTuple2} import java.lang.{Long => JLong} +import org.apache.flink.annotation.PublicEvolving import org.apache.flink.cep.operator.CEPOperatorUtils import org.apache.flink.cep.scala.pattern.Pattern @@ -45,8 +46,23 @@ import scala.collection.mutable */ class PatternStream[T](jPatternStream: JPatternStream[T]) { + private[flink] var lateDataOutputTag: OutputTag[T] = null + private[flink] def wrappedPatternStream = jPatternStream + + /** + * Send late arriving data to the side output identified by the given {@link OutputTag}. The + * CEP library assumes correctness of the watermark, so an element is considered late if its + * timestamp is smaller than the last received watermark. + */ + @PublicEvolving + def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = { + jPatternStream.withLateDataOutputTag(outputTag) + lateDataOutputTag = outputTag + this + } + def getPattern: Pattern[T, T] = Pattern(jPatternStream.getPattern.asInstanceOf[JPattern[T, T]]) def getInputStream: DataStream[T] = asScalaStream(jPatternStream.getInputStream()) @@ -93,7 +109,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { val patternStream = CEPOperatorUtils.createTimeoutPatternStream( jPatternStream.getInputStream(), - jPatternStream.getPattern()) + jPatternStream.getPattern(), + lateDataOutputTag) val cleanedSelect = cleanClosure(patternSelectFunction) val cleanedTimeout = cleanClosure(patternTimeoutFunction) @@ -158,7 +175,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { : DataStream[Either[L, R]] = { val patternStream = CEPOperatorUtils.createTimeoutPatternStream( jPatternStream.getInputStream(), - jPatternStream.getPattern() + jPatternStream.getPattern(), + lateDataOutputTag ) val cleanedSelect = cleanClosure(patternFlatSelectFunction) @@ -317,6 +335,17 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { flatSelect(patternFlatTimeoutFun, patternFlatSelectFun) } + + /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag}. + * + * @param tag The tag identifying a specific side output. + */ + @PublicEvolving + def getSideOutput[X: TypeInformation](tag: OutputTag[X]): DataStream[X] = { + asScalaStream(jPatternStream.getSideOutput(tag)) + } } object PatternStream { http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index efcd16c..87666a5 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -27,8 +27,11 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.cep.operator.CEPOperatorUtils; import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.datastream.DataStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Either; import org.apache.flink.util.Collector; +import org.apache.flink.util.OutputTag; +import org.apache.flink.util.Preconditions; import java.util.Map; @@ -50,6 +53,19 @@ public class PatternStream<T> { private final Pattern<T, ?> pattern; + /** + * A reference to the created pattern stream used to get + * the registered side outputs, e.g late elements side output. + */ + private SingleOutputStreamOperator<?> patternStream; + + /** + * {@link OutputTag} to use for late arriving events. Elements for which + * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will + * be emitted to this. + */ + private OutputTag<T> lateDataOutputTag; + PatternStream(final DataStream<T> inputStream, final Pattern<T, ?> pattern) { this.inputStream = inputStream; this.pattern = pattern; @@ -64,6 +80,22 @@ public class PatternStream<T> { } /** + * Send late arriving data to the side output identified by the given {@link OutputTag}. The + * CEP library assumes correctness of the watermark, so an element is considered late if its + * timestamp is smaller than the last received watermark. + */ + public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) { + Preconditions.checkNotNull(outputTag, "Side output tag must not be null."); + Preconditions.checkArgument(lateDataOutputTag == null, + "The late side output tag has already been initialized to " + lateDataOutputTag + "."); + Preconditions.checkArgument(patternStream == null, + "The late side output tag has to be set before calling select() or flatSelect()."); + + this.lateDataOutputTag = inputStream.getExecutionEnvironment().clean(outputTag); + return this; + } + + /** * Applies a select function to the detected pattern sequence. For each pattern sequence the * provided {@link PatternSelectFunction} is called. The pattern select function can produce * exactly one resulting element. @@ -74,7 +106,7 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements from the pattern select * function. */ - public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction) { + public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction) { // we have to extract the output type from the provided pattern selection function manually // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction @@ -102,8 +134,10 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements from the pattern select * function. */ - public <R> DataStream<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { - DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern); + public <R> SingleOutputStreamOperator<R> select(final PatternSelectFunction<T, R> patternSelectFunction, TypeInformation<R> outTypeInfo) { + SingleOutputStreamOperator<Map<String, T>> patternStream = + CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag); + this.patternStream = patternStream; return patternStream.map( new PatternSelectMapper<>( @@ -129,11 +163,13 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements or the resulting timeout * elements wrapped in an {@link Either} type. */ - public <L, R> DataStream<Either<L, R>> select( + public <L, R> SingleOutputStreamOperator<Either<L, R>> select( final PatternTimeoutFunction<T, L> patternTimeoutFunction, final PatternSelectFunction<T, R> patternSelectFunction) { - DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern); + SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = + CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag); + this.patternStream = patternStream; TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternTimeoutFunction, @@ -174,7 +210,7 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements from the pattern flat select * function. */ - public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { + public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { // we have to extract the output type from the provided pattern selection function manually // because the TypeExtractor cannot do that if the method is wrapped in a MapFunction TypeInformation<R> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType( @@ -201,8 +237,10 @@ public class PatternStream<T> { * @return {@link DataStream} which contains the resulting elements from the pattern flat select * function. */ - public <R> DataStream<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { - DataStream<Map<String, T>> patternStream = CEPOperatorUtils.createPatternStream(inputStream, pattern); + public <R> SingleOutputStreamOperator<R> flatSelect(final PatternFlatSelectFunction<T, R> patternFlatSelectFunction, TypeInformation<R> outTypeInfo) { + SingleOutputStreamOperator<Map<String, T>> patternStream = + CEPOperatorUtils.createPatternStream(inputStream, pattern, lateDataOutputTag); + this.patternStream = patternStream; return patternStream.flatMap( new PatternFlatSelectMapper<>( @@ -229,11 +267,13 @@ public class PatternStream<T> { * function or the resulting timeout events from the pattern flat timeout function wrapped in an * {@link Either} type. */ - public <L, R> DataStream<Either<L, R>> flatSelect( + public <L, R> SingleOutputStreamOperator<Either<L, R>> flatSelect( final PatternFlatTimeoutFunction<T, L> patternFlatTimeoutFunction, final PatternFlatSelectFunction<T, R> patternFlatSelectFunction) { - DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern); + SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream = + CEPOperatorUtils.createTimeoutPatternStream(inputStream, pattern, lateDataOutputTag); + this.patternStream = patternStream; TypeInformation<L> leftTypeInfo = TypeExtractor.getUnaryOperatorReturnType( patternFlatTimeoutFunction, @@ -264,6 +304,18 @@ public class PatternStream<T> { } /** + * Gets the {@link DataStream} that contains the elements that are emitted from an operation + * into the side output with the given {@link OutputTag}. + * + * @param sideOutputTag The tag identifying a specific side output. + */ + public <X> DataStream<X> getSideOutput(OutputTag<X> sideOutputTag) { + Preconditions.checkNotNull(patternStream, "The operator has not been initialized. " + + "To have the late element side output, you have to first define the main output using select() or flatSelect()."); + return patternStream.getSideOutput(sideOutputTag); + } + + /** * Wrapper for a {@link PatternSelectFunction}. * * @param <T> Type of the input elements http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java index 3e18660..b232dbb 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/AbstractKeyedCEPPatternOperator.java @@ -40,6 +40,7 @@ import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; import org.apache.flink.util.Preconditions; import java.io.IOException; @@ -89,6 +90,19 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> private final NFACompiler.NFAFactory<IN> nfaFactory; /** + * {@link OutputTag} to use for late arriving events. Elements for which + * {@code window.maxTimestamp + allowedLateness} is smaller than the current watermark will + * be emitted to this. + */ + private final OutputTag<IN> lateDataOutputTag; + + /** + * The last seen watermark. This will be used to + * decide if an incoming element is late or not. + */ + private long lastWatermark; + + /** * A flag used in the case of migration that indicates if * we are restoring from an old keyed or non-keyed operator. */ @@ -100,6 +114,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> final KeySelector<IN, KEY> keySelector, final TypeSerializer<KEY> keySerializer, final NFACompiler.NFAFactory<IN> nfaFactory, + final OutputTag<IN> lateDataOutputTag, final boolean migratingFromOldKeyedOperator) { this.inputSerializer = Preconditions.checkNotNull(inputSerializer); @@ -107,11 +122,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> this.keySelector = Preconditions.checkNotNull(keySelector); this.keySerializer = Preconditions.checkNotNull(keySerializer); this.nfaFactory = Preconditions.checkNotNull(nfaFactory); - this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator; - } - public TypeSerializer<IN> getInputSerializer() { - return inputSerializer; + this.lateDataOutputTag = lateDataOutputTag; + this.migratingFromOldKeyedOperator = migratingFromOldKeyedOperator; } @Override @@ -159,6 +172,7 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> // 3) advance the time to the current watermark, so that expired patterns are discarded. // 4) update the stored state for the key, by only storing the new NFA and priority queue iff they // have state to be used later. + // 5) update the last seen watermark. // STEP 1 PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); @@ -180,6 +194,9 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> if (priorityQueue.isEmpty() && nfa.isEmpty()) { watermarkCallbackService.unregisterKeyFromWatermarkCallback(key); } + + // STEP 5 + updateLastSeenWatermark(watermark); } }, keySerializer @@ -196,19 +213,45 @@ public abstract class AbstractKeyedCEPPatternOperator<IN, KEY, OUT> updateNFA(nfa); } else { - getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); - PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + // In event-time processing we assume correctness of the watermark. + // Events with timestamp smaller than the last seen watermark are considered late. + // Late events are put in a dedicated side output, if the user has specified one. + + if (element.getTimestamp() >= lastWatermark) { - // event time processing - // we have to buffer the elements until we receive the proper watermark - if (getExecutionConfig().isObjectReuseEnabled()) { - // copy the StreamRecord so that it cannot be changed - priorityQueue.offer(new StreamRecord<IN>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + // we have an event with a valid timestamp, so + // we buffer it until we receive the proper watermark. + + getInternalWatermarkCallbackService().registerKeyForWatermarkCallback(keySelector.getKey(element.getValue())); + + PriorityQueue<StreamRecord<IN>> priorityQueue = getPriorityQueue(); + if (getExecutionConfig().isObjectReuseEnabled()) { + // copy the StreamRecord so that it cannot be changed + priorityQueue.offer(new StreamRecord<>(inputSerializer.copy(element.getValue()), element.getTimestamp())); + } else { + priorityQueue.offer(element); + } + updatePriorityQueue(priorityQueue); } else { - priorityQueue.offer(element); + sideOutputLateElement(element); } - updatePriorityQueue(priorityQueue); + } + } + + private void updateLastSeenWatermark(Watermark watermark) { + this.lastWatermark = watermark.getTimestamp(); + } + + /** + * Puts the provided late element in the dedicated side output, + * if the user has specified one. + * + * @param element The late element. + */ + private void sideOutputLateElement(StreamRecord<IN> element) { + if (lateDataOutputTag != null) { + output.collect(lateDataOutputTag, element); } } http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java index a5eef45..c12680f 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/CEPOperatorUtils.java @@ -33,7 +33,9 @@ import org.apache.flink.cep.pattern.Pattern; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.KeyedStream; +import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.types.Either; +import org.apache.flink.util.OutputTag; import java.util.Map; @@ -46,7 +48,7 @@ public class CEPOperatorUtils { * @return Data stream containing fully matched event sequences stored in a {@link Map}. The * events are indexed by their associated names of the pattern. */ - public static <K, T> DataStream<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) { + public static <K, T> SingleOutputStreamOperator<Map<String, T>> createPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) { final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); // check whether we use processing time @@ -55,7 +57,7 @@ public class CEPOperatorUtils { // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, false); - final DataStream<Map<String, T>> patternStream; + final SingleOutputStreamOperator<Map<String, T>> patternStream; if (inputStream instanceof KeyedStream) { // We have to use the KeyedCEPPatternOperator which can deal with keyed input streams @@ -73,6 +75,7 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, + lateDataOutputTag, true)); } else { @@ -88,6 +91,7 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, + lateDataOutputTag, false )).forceNonParallel(); } @@ -104,7 +108,8 @@ public class CEPOperatorUtils { * @return Data stream containing fully matched and partially matched event sequences wrapped in * a {@link Either} instance. */ - public static <K, T> DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream(DataStream<T> inputStream, Pattern<T, ?> pattern) { + public static <K, T> SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> createTimeoutPatternStream( + DataStream<T> inputStream, Pattern<T, ?> pattern, OutputTag<T> lateDataOutputTag) { final TypeSerializer<T> inputSerializer = inputStream.getType().createSerializer(inputStream.getExecutionConfig()); @@ -114,7 +119,7 @@ public class CEPOperatorUtils { // compile our pattern into a NFAFactory to instantiate NFAs later on final NFACompiler.NFAFactory<T> nfaFactory = NFACompiler.compileFactory(pattern, inputSerializer, true); - final DataStream<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream; + final SingleOutputStreamOperator<Either<Tuple2<Map<String, T>, Long>, Map<String, T>>> patternStream; final TypeInformation<Map<String, T>> rightTypeInfo = (TypeInformation<Map<String, T>>) (TypeInformation<?>) TypeExtractor.getForClass(Map.class); final TypeInformation<Tuple2<Map<String, T>, Long>> leftTypeInfo = new TupleTypeInfo<>(rightTypeInfo, BasicTypeInfo.LONG_TYPE_INFO); @@ -130,12 +135,13 @@ public class CEPOperatorUtils { patternStream = keyedStream.transform( "TimeoutKeyedCEPPatternOperator", eitherTypeInformation, - new TimeoutKeyedCEPPatternOperator<T, K>( + new TimeoutKeyedCEPPatternOperator<>( inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, + lateDataOutputTag, true)); } else { @@ -151,6 +157,7 @@ public class CEPOperatorUtils { keySelector, keySerializer, nfaFactory, + lateDataOutputTag, false )).forceNonParallel(); } http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java index 21cee23..532bba3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/KeyedCEPPatternOperator.java @@ -24,6 +24,7 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.util.OutputTag; import java.util.Collection; import java.util.Iterator; @@ -46,9 +47,10 @@ public class KeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPatternOpe KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory, + OutputTag<IN> lateDataOutputTag, boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java index c6fba55..933bfd3 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/operator/TimeoutKeyedCEPPatternOperator.java @@ -25,6 +25,7 @@ import org.apache.flink.cep.nfa.NFA; import org.apache.flink.cep.nfa.compiler.NFACompiler; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.types.Either; +import org.apache.flink.util.OutputTag; import java.util.Collection; import java.util.Map; @@ -46,9 +47,10 @@ public class TimeoutKeyedCEPPatternOperator<IN, KEY> extends AbstractKeyedCEPPat KeySelector<IN, KEY> keySelector, TypeSerializer<KEY> keySerializer, NFACompiler.NFAFactory<IN> nfaFactory, + OutputTag<IN> lateDataOutputTag, boolean migratingFromOldKeyedOperator) { - super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, migratingFromOldKeyedOperator); + super(inputSerializer, isProcessingTime, keySelector, keySerializer, nfaFactory, lateDataOutputTag, migratingFromOldKeyedOperator); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index 42117ee..a5015df 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.util.StreamingMultipleProgramsTestBase; import org.apache.flink.types.Either; +import org.apache.flink.util.OutputTag; import org.junit.After; import org.junit.Before; import org.junit.Rule; @@ -47,6 +48,9 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { private String resultPath; private String expected; + private String lateEventPath; + private String expectedLateEvents; + @Rule public TemporaryFolder tempFolder = new TemporaryFolder(); @@ -54,11 +58,15 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { public void before() throws Exception { resultPath = tempFolder.newFile().toURI().toString(); expected = ""; + + lateEventPath = tempFolder.newFile().toURI().toString(); + expectedLateEvents = ""; } @After public void after() throws Exception { compareResultsByLinesInMemory(expected, resultPath); + compareResultsByLinesInMemory(expectedLateEvents, lateEventPath); } /** @@ -572,4 +580,99 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { env.execute(); } + + @Test + public void testLateEventSideOutput() throws Exception { + StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); + env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); + env.setParallelism(1); + + // (Event, timestamp) + DataStream<Event> input = env.fromElements( + Tuple2.of(new Event(1, "start", 1.0), 1L), + Tuple2.of(new Event(2, "middle", 2.0), 2L), + Tuple2.of(new Event(3, "end", 3.0), 15L), + Tuple2.of(new Event(4, "middle", 5.0), 7L), + Tuple2.of(new Event(6, "start", 1.0), 21L), + Tuple2.of(new Event(5, "middle", 5.0), 10L), + Tuple2.of(new Event(7, "middle", 2.0), 22L), + Tuple2.of(new Event(8, "end", 3.0), 23L) + ).assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<Event,Long>>() { + + @Override + public long extractTimestamp(Tuple2<Event, Long> element, long previousTimestamp) { + return element.f1; + } + + @Override + public Watermark checkAndGetNextWatermark(Tuple2<Event, Long> lastElement, long extractedTimestamp) { + return lastElement.f0.getName().equals("end") ? new Watermark(extractedTimestamp) : null; + } + + }).map(new MapFunction<Tuple2<Event, Long>, Event>() { + + @Override + public Event map(Tuple2<Event, Long> value) throws Exception { + return value.f0; + } + }); + + Pattern<Event, ?> pattern = Pattern.<Event>begin("start").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("start"); + } + }).followedBy("middle").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("middle"); + } + }).followedBy("end").where(new SimpleCondition<Event>() { + + @Override + public boolean filter(Event value) throws Exception { + return value.getName().equals("end"); + } + }); + + final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){}; + + PatternStream<Event> patternStream = CEP.pattern(input, pattern).withLateDataOutputTag(lateOutputTag); + DataStream<String> result = patternStream.select( + new PatternSelectFunction<Event, String>() { + + @Override + public String select(Map<String, Event> pattern) { + StringBuilder builder = new StringBuilder(); + + builder.append(pattern.get("start").getId()).append(",") + .append(pattern.get("middle").getId()).append(",") + .append(pattern.get("end").getId()); + return builder.toString(); + } + } + ); + + DataStream<Event> lateEvents = patternStream.getSideOutput(lateOutputTag); + + // we just care for the late events in this test. + lateEvents.map( + new MapFunction<Event, Integer>() { + + @Override + public Integer map(Event value) throws Exception { + return value.getId(); + } + } + ).writeAsText(lateEventPath, FileSystem.WriteMode.OVERWRITE); + + // the expected sequence of late event ids + expectedLateEvents = "4\n5"; + + result.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE); + expected = "1,2,3\n1,2,8\n1,7,8\n6,7,8"; + env.execute(); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java index b83eb3c..4e05fcf 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration11to13Test.java @@ -103,6 +103,7 @@ public class CEPMigration11to13Test { keySelector, IntSerializer.INSTANCE, new NFAFactory(), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -177,6 +178,7 @@ public class CEPMigration11to13Test { keySelector, ByteSerializer.INSTANCE, new NFAFactory(), + null, false), keySelector, BasicTypeInfo.BYTE_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java index dbe4230..8249535 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPMigration12to13Test.java @@ -118,6 +118,7 @@ public class CEPMigration12to13Test { keySelector, IntSerializer.INSTANCE, new NFAFactory(), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -233,6 +234,7 @@ public class CEPMigration12to13Test { keySelector, IntSerializer.INSTANCE, new NFAFactory(), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -353,6 +355,7 @@ public class CEPMigration12to13Test { keySelector, IntSerializer.INSTANCE, new SinglePatternNFAFactory(), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java index 726c8b8..d599ec9 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPOperatorTest.java @@ -225,6 +225,7 @@ public class CEPOperatorTest extends TestLogger { keySelector, IntSerializer.INSTANCE, new NFAFactory(true), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO); @@ -482,6 +483,7 @@ public class CEPOperatorTest extends TestLogger { keySelector, IntSerializer.INSTANCE, new NFAFactory(), + null, true); } http://git-wip-us.apache.org/repos/asf/flink/blob/48890285/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java index 2c86648..a048183 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/operator/CEPRescalingTest.java @@ -346,6 +346,7 @@ public class CEPRescalingTest { keySelector, BasicTypeInfo.INT_TYPE_INFO.createSerializer(new ExecutionConfig()), new NFAFactory(), + null, true), keySelector, BasicTypeInfo.INT_TYPE_INFO,