[hotfix] Make GC test more strict in WindowOperatorContractTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/662ed33d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/662ed33d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/662ed33d Branch: refs/heads/table-retraction Commit: 662ed33d8f5baed95035b8176daf95a1caa0b278 Parents: fad201b Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Sat Mar 25 16:59:31 2017 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Sat Mar 25 16:59:31 2017 +0100 ---------------------------------------------------------------------- .../windowing/WindowOperatorContractTest.java | 21 ++++++++++++++++++-- 1 file changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/662ed33d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index faab505..3ae8f37 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -1853,13 +1853,20 @@ public abstract class WindowOperatorContractTest extends TestLogger { } private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception { + long allowedLateness = 20L; + + if (timeAdaptor instanceof ProcessingTimeAdaptor) { + // we don't have allowed lateness for processing time + allowedLateness = 0; + } + WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); InternalWindowFunction<Iterable<Integer>, Void, Integer, TimeWindow> mockWindowFunction = mockWindowFunction(); KeyedOneInputStreamOperatorTestHarness<Integer, Integer, Void> testHarness = - createWindowOperator(mockAssigner, mockTrigger, 20L, mockWindowFunction); + createWindowOperator(mockAssigner, mockTrigger, allowedLateness, mockWindowFunction); testHarness.open(); @@ -1879,7 +1886,17 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockTrigger, never()).clear(anyTimeWindow(), anyTriggerContext()); - timeAdaptor.advanceTime(testHarness, 19 + 20); // 19 is maxTime of the window + // verify that we can still fire on the GC timer + timeAdaptor.shouldFireOnTime(mockTrigger); + + timeAdaptor.advanceTime(testHarness, 19 + allowedLateness); // 19 is maxTime of the window + + // ensure that our trigger is still called + timeAdaptor.verifyTriggerCallback(mockTrigger, times(1), 19L + allowedLateness, null); + + // ensure that our window function is called a last timer if the trigger + // fires on the GC timer + verify(mockWindowFunction, times(1)).process(eq(0), eq(new TimeWindow(0, 20)), anyInternalWindowContext(), intIterable(0), WindowOperatorContractTest.<Void>anyCollector()); verify(mockTrigger, times(1)).clear(anyTimeWindow(), anyTriggerContext());