Repository: flink Updated Branches: refs/heads/master c5282cbcf -> 1b6baddca
[FLINK-6254] [cep] Same method name for late data outputs on PatternStream and WindowedStream Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/1b6baddc Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/1b6baddc Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/1b6baddc Branch: refs/heads/master Commit: 1b6baddca07bfba6093951e82ac9108cf4728f2a Parents: c5282cb Author: kl0u <[email protected]> Authored: Mon Apr 3 17:39:02 2017 +0200 Committer: kl0u <[email protected]> Committed: Tue Apr 4 11:29:09 2017 +0200 ---------------------------------------------------------------------- docs/dev/libs/cep.md | 10 +++++----- .../scala/org/apache/flink/cep/scala/PatternStream.scala | 4 ++-- .../src/main/java/org/apache/flink/cep/PatternStream.java | 2 +- .../src/test/java/org/apache/flink/cep/CEPITCase.java | 2 +- 4 files changed, 9 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/1b6baddc/docs/dev/libs/cep.md ---------------------------------------------------------------------- diff --git a/docs/dev/libs/cep.md b/docs/dev/libs/cep.md index 643d6ee..15afdf5 100644 --- a/docs/dev/libs/cep.md +++ b/docs/dev/libs/cep.md @@ -790,13 +790,13 @@ in event time. To also guarantee that elements across watermarks are processed in event-time order, Flink's CEP library assumes *correctness of the watermark*, and considers as *late* elements whose timestamp is smaller than that of the last seen watermark. Late elements are not further processed but they can be redirected to a [side output] -({{ site.baseurl }}/dev/stream/side_output.html), dedicated to them. +({{ site.baseurl }}/dev/stream/side_output.html) dedicated to them. To access the stream of late elements, you first need to specify that you want to get the late data using -`.withLateDataOutputTag(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do +`.sideOutputLateData(OutputTag)` on the `PatternStream` returned using the `CEP.pattern(...)` call. If you do not do so, the late elements will be silently dropped. Then, you can get the side-output stream using the `.getSideOutput(OutputTag)` on the aforementioned `PatternStream`, and providing as argument the output tag used in -the `.withLateDataOutputTag(OutputTag)`: +the `.sideOutputLateData(OutputTag)`: <div class="codetabs" markdown="1"> <div data-lang="java" markdown="1"> @@ -804,7 +804,7 @@ the `.withLateDataOutputTag(OutputTag)`: final OutputTag<T> lateOutputTag = new OutputTag<T>("late-data"){}; PatternStream<T> patternStream = CEP.pattern(...) - .withLateDataOutputTag(lateOutputTag); + .sideOutputLateData(lateOutputTag); // main output with matches DataStream<O> result = patternStream.select(...) @@ -819,7 +819,7 @@ DataStream<T> lateStream = patternStream.getSideOutput(lateOutputTag); val lateOutputTag = OutputTag[T]("late-data") val patternStream: PatternStream[T] = CEP.pattern(...) - .withLateDataOutputTag(lateOutputTag) + .sideOutputLateData(lateOutputTag) // main output with matches val result = patternStream.select(...) http://git-wip-us.apache.org/repos/asf/flink/blob/1b6baddc/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala index fb09c15..7c92886 100644 --- a/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala +++ b/flink-libraries/flink-cep-scala/src/main/scala/org/apache/flink/cep/scala/PatternStream.scala @@ -57,8 +57,8 @@ class PatternStream[T](jPatternStream: JPatternStream[T]) { * timestamp is smaller than the last received watermark. */ @PublicEvolving - def withLateDataOutputTag(outputTag: OutputTag[T]): PatternStream[T] = { - jPatternStream.withLateDataOutputTag(outputTag) + def sideOutputLateData(outputTag: OutputTag[T]): PatternStream[T] = { + jPatternStream.sideOutputLateData(outputTag) lateDataOutputTag = outputTag this } http://git-wip-us.apache.org/repos/asf/flink/blob/1b6baddc/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java index 87666a5..5f2327c 100644 --- a/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java +++ b/flink-libraries/flink-cep/src/main/java/org/apache/flink/cep/PatternStream.java @@ -84,7 +84,7 @@ public class PatternStream<T> { * CEP library assumes correctness of the watermark, so an element is considered late if its * timestamp is smaller than the last received watermark. */ - public PatternStream<T> withLateDataOutputTag(OutputTag<T> outputTag) { + public PatternStream<T> sideOutputLateData(OutputTag<T> outputTag) { Preconditions.checkNotNull(outputTag, "Side output tag must not be null."); Preconditions.checkArgument(lateDataOutputTag == null, "The late side output tag has already been initialized to " + lateDataOutputTag + "."); http://git-wip-us.apache.org/repos/asf/flink/blob/1b6baddc/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java ---------------------------------------------------------------------- diff --git a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java index a5015df..3a32175 100644 --- a/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java +++ b/flink-libraries/flink-cep/src/test/java/org/apache/flink/cep/CEPITCase.java @@ -639,7 +639,7 @@ public class CEPITCase extends StreamingMultipleProgramsTestBase { final OutputTag<Event> lateOutputTag = new OutputTag<Event>("late-data"){}; - PatternStream<Event> patternStream = CEP.pattern(input, pattern).withLateDataOutputTag(lateOutputTag); + PatternStream<Event> patternStream = CEP.pattern(input, pattern).sideOutputLateData(lateOutputTag); DataStream<String> result = patternStream.select( new PatternSelectFunction<Event, String>() {
