Repository: flink
Updated Branches:
  refs/heads/master bf6df12d8 -> e34fea53b


[FLINK-4134] Retire Late Windows/Elements in WindowOperator

Before, when processing an element that would end up in a late window
(when using a MergingWindowAssigner), the element would be added to the
MergingWindowSet. After determining that the window is late it would not
be removed from the MergingWindowSet. This can lead to problems with
other elements being merged into these "phantom" windows and causing
triggers to be added for empty windows.

This also fixes the same code in EvictingWindowOperator.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e34fea53
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e34fea53
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e34fea53

Branch: refs/heads/master
Commit: e34fea53b69cd7c40eca716e3f2d7a68ac3cbc18
Parents: bf6df12
Author: Stefan Richter <s.rich...@data-artisans.com>
Authored: Mon Jul 4 14:32:42 2016 +0200
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Jul 4 17:24:08 2016 +0200

----------------------------------------------------------------------
 .../runtime/operators/windowing/EvictingWindowOperator.java        | 1 +
 .../streaming/runtime/operators/windowing/WindowOperator.java      | 1 +
 .../streaming/runtime/operators/windowing/WindowOperatorTest.java  | 2 ++
 3 files changed, 4 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
index d82fc85..3f2c6a3 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/EvictingWindowOperator.java
@@ -133,6 +133,7 @@ public class EvictingWindowOperator<K, IN, OUT, W extends 
Window> extends Window
                                // check if the window is already inactive
                                if (isLate(actualWindow)) {
                                        LOG.info("Dropped element " + element + 
" for window " + actualWindow + " due to lateness.");
+                                       
mergingWindows.retireWindow(actualWindow);
                                        continue;
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
index f06fd33..b6ca564 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java
@@ -335,6 +335,7 @@ public class WindowOperator<K, IN, ACC, OUT, W extends 
Window>
                                // drop if the window is already late
                                if (isLate(actualWindow)) {
                                        LOG.info("Dropped element " + element+ 
" for window " + actualWindow + " due to lateness.");
+                                       
mergingWindows.retireWindow(actualWindow);
                                        continue;
                                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e34fea53/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
index 8ba6da2..ba335ee 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorTest.java
@@ -1477,6 +1477,8 @@ public class WindowOperatorTest {
 
                // this is dropped as late
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 10000));
+               // this is also dropped as late (we test that they are not 
accidentally merged)
+               testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 10100));
 
                testHarness.processElement(new StreamRecord<>(new 
Tuple2<>("key2", 1), 14500));
                testHarness.processWatermark(new Watermark(20000));

Reply via email to