Repository: flink Updated Branches: refs/heads/master bf6df12d8 -> e34fea53b
[FLINK-4134] Retire Late Windows/Elements in WindowOperator Before, when processing an element that would end up in a late window (when using a MergingWindowAssigner), the element would be added to the MergingWindowSet. After determining that the window is late it would not be removed from the MergingWindowSet. This can lead to problems with other elements being merged into these "phantom" windows and causing triggers to be added for empty windows. This also fixes the same code in EvictingWindowOperator. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e34fea53 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e34fea53 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e34fea53 Branch: refs/heads/master Commit: e34fea53b69cd7c40eca716e3f2d7a68ac3cbc18 Parents: bf6df12 Author: Stefan Richter <s.rich...@data-artisans.com> Authored: Mon Jul 4 14:32:42 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Jul 4 17:24:08 2016 +0200 ---------------------------------------------------------------------- .../runtime/operators/windowing/EvictingWindowOperator.java | 1 + .../streaming/runtime/operators/windowing/WindowOperator.java | 1 + .../streaming/runtime/operators/windowing/WindowOperatorTest.java | 2 ++ 3 files changed, 4 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java index d82fc85..3f2c6a3 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java @@ -133,6 +133,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends Window> extends Window // check if the window is already inactive if (isLate(actualWindow)) { LOG.info("Dropped element " + element + " for window " + actualWindow + " due to lateness."); + mergingWindows.retireWindow(actualWindow); continue; } http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java index f06fd33..b6ca564 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java @@ -335,6 +335,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends Window> // drop if the window is already late if (isLate(actualWindow)) { LOG.info("Dropped element " + element+ " for window " + actualWindow + " due to lateness."); + mergingWindows.retireWindow(actualWindow); continue; } http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java index 8ba6da2..ba335ee 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java @@ -1477,6 +1477,8 @@ public class WindowOperatorTest { // this is dropped as late testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10000)); + // this is also dropped as late (we test that they are not accidentally merged) + testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 10100)); testHarness.processElement(new StreamRecord<>(new Tuple2<>("key2", 1), 14500)); testHarness.processWatermark(new Watermark(20000));