[hotfix] Make GC test more strict in WindowOperatorContractTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed33d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed33d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed33d

Branch: refs/heads/table-retraction
Commit: 662ed33d8f5baed95035b8176daf95a1caa0b278
Parents: fad201b
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Sat Mar 25 16:59:31 2017 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Sat Mar 25 16:59:31 2017 +0100

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 21 ++++++++++++++++++--
 1 file changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/662ed33d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index faab505..3ae8f37 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -1853,13 +1853,20 @@ public abstract class WindowOperatorContractTest 
extends TestLogger {
        }
 
        private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) 
throws Exception {
+               long allowedLateness = 20L;
+
+               if (timeAdaptor instanceof ProcessingTimeAdaptor) {
+                       // we don't have allowed lateness for processing time
+                       allowedLateness = 0;
+               }
+
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
                InternalWindowFunction<Iterable<Integer>, Void, Integer, 
TimeWindow> mockWindowFunction = mockWindowFunction();
 
                KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> 
testHarness =
-                               createWindowOperator(mockAssigner, mockTrigger, 
20L, mockWindowFunction);
+                               createWindowOperator(mockAssigner, mockTrigger, 
allowedLateness, mockWindowFunction);
 
                testHarness.open();
 
@@ -1879,7 +1886,17 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                verify(mockTrigger, never()).clear(anyTimeWindow(), 
anyTriggerContext());
 
-               timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime 
of the window
+               // verify that we can still fire on the GC timer
+               timeAdaptor.shouldFireOnTime(mockTrigger);
+
+               timeAdaptor.advanceTime(testHarness, 19 + allowedLateness); // 
19 is maxTime of the window
+
+               // ensure that our trigger is still called
+               timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 19L + 
allowedLateness, null);
+
+               // ensure that our window function is called a last timer if 
the trigger
+               // fires on the GC timer
+               verify(mockWindowFunction, times(1)).process(eq(0), eq(new 
TimeWindow(0, 20)), anyInternalWindowContext(), intIterable(0), 
WindowOperatorContractTest.<Void>anyCollector());
 
                verify(mockTrigger, times(1)).clear(anyTimeWindow(), 
anyTriggerContext());
 

Reply via email to