[hotfix] Fix various small issues in WindowOperatorContractTest
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25d52e4d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25d52e4d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25d52e4d Branch: refs/heads/table-retraction Commit: 25d52e4df216dc54d2d82e1f0b449871bda4ba74 Parents: 3c4b156 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Wed Mar 22 17:02:15 2017 +0100 Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org> Committed: Thu Mar 23 23:29:01 2017 +0800 ---------------------------------------------------------------------- .../windowing/WindowOperatorContractTest.java | 97 +++++++++----------- 1 file changed, 44 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/25d52e4d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java ---------------------------------------------------------------------- diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java index abc7b3e..aaea8b1 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.runtime.operators.windowing; import static org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; @@ -36,7 +37,7 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import com.google.common.collect.Lists; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; import java.util.Collections; @@ -155,7 +156,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { @SuppressWarnings("unchecked") static Iterable<Integer> intIterable(Integer... values) { - return (Iterable<Integer>) argThat(containsInAnyOrder(values)); + return (Iterable<Integer>) argThat(contains(values)); } static TimeWindow anyTimeWindow() { @@ -247,55 +248,55 @@ public abstract class WindowOperatorContractTest extends TestLogger { } private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); } private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); } private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); } private static <T> void shouldFireAndPurgeOnElement(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); } private static <T> void shouldContinueOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); } private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); } private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); } private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onEventTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); } private static <T> void shouldContinueOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE); } private static <T> void shouldFireOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE); } private static <T> void shouldPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE); } private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, TimeWindow> mockTrigger) throws Exception { - when(mockTrigger.onProcessingTime(anyLong(), Matchers.<TimeWindow>any(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); + when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE); } /** - * Verify that there is no late-date side output if the {@code WindowAssigner} does + * Verify that there is no late-data side output if the {@code WindowAssigner} does * not assign any windows. */ @Test @@ -346,7 +347,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), anyAssignerContext()); assertThat(testHarness.getSideOutput(lateOutputTag), - containsInAnyOrder(isStreamRecord(0, 5L))); + contains(isStreamRecord(0, 5L))); // we should also see side output if the WindowAssigner assigns no windows when(mockAssigner.assignWindows(anyInt(), anyLong(), anyAssignerContext())) @@ -358,7 +359,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), anyAssignerContext()); assertThat(testHarness.getSideOutput(lateOutputTag), - containsInAnyOrder(isStreamRecord(0, 5L), isStreamRecord(0, 10L))); + contains(isStreamRecord(0, 5L), isStreamRecord(0, 10L))); } @@ -520,7 +521,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); assertThat(testHarness.extractOutputStreamRecords(), - containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); + contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); } @Test @@ -534,7 +535,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { } - private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testEmittingFromWindowFunction(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -573,7 +574,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { verify(mockWindowFunction, times(1)).apply(eq(0), eq(new TimeWindow(0, 2)), intIterable(0), WindowOperatorContractTest.<String>anyCollector()); assertThat(testHarness.extractOutputStreamRecords(), - containsInAnyOrder(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); + contains(isStreamRecord("Hallo", 1L), isStreamRecord("Ciao", 1L))); } @Test @@ -1067,9 +1068,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { /** * Verify that we neither invoke the trigger nor the window function if a timer - * for an empty merging window. + * for an empty merging window fires. */ - public void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testNoTimerFiringForPurgedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1133,7 +1134,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { * Verify that we neither invoke the trigger nor the window function if a timer * fires for a merging window that was already garbage collected. */ - public void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testNoTimerFiringForGarbageCollectedMergingWindow(final TimeDomainAdaptor timeAdaptor) throws Exception { MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1166,7 +1167,6 @@ public abstract class WindowOperatorContractTest extends TestLogger { } }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); - testHarness.processElement(new StreamRecord<>(0, 0L)); assertEquals(2, testHarness.numKeyedStateEntries()); // window contents and merging window set @@ -1311,7 +1311,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor()); } - public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1372,8 +1372,9 @@ public abstract class WindowOperatorContractTest extends TestLogger { testHarness.processElement(new StreamRecord<>(0, 0L)); - verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4))), anyMergeCallback()); - verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback()); + verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback()); + verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new TimeWindow(2, 4))), anyMergeCallback()); + verify(mockAssigner, times(2)).mergeWindows(anyCollection(), anyMergeCallback()); @@ -1392,7 +1393,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { /** * Verify that windows are merged eagerly, if possible. */ - public void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testWindowsAreMergedEagerly(final TimeDomainAdaptor timeAdaptor) throws Exception { // in this test we only have one state window and windows are eagerly // merged into the first window @@ -1456,8 +1457,8 @@ public abstract class WindowOperatorContractTest extends TestLogger { shouldMergeWindows( mockAssigner, - Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)), - Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4)), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4))), new TimeWindow(0, 4)); // don't register a timer or update state in onElement, this checks @@ -1491,7 +1492,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { * Verify that we only keep one of the underlying state windows. This test also verifies that * GC timers are correctly deleted when merging windows. */ - public void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testMergingOfExistingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1562,8 +1563,8 @@ public abstract class WindowOperatorContractTest extends TestLogger { shouldMergeWindows( mockAssigner, - Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)), - Lists.newArrayList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3)), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))), + new ArrayList<>(Arrays.asList(new TimeWindow(0, 2), new TimeWindow(2, 4), new TimeWindow(1, 3))), new TimeWindow(0, 4)); testHarness.processElement(new StreamRecord<>(0, 0L)); @@ -1618,7 +1619,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testOnTimePurgeDoesNotCleanupMergingSet(new ProcessingTimeAdaptor()); } - public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor timeAdaptor) throws Exception { MergingWindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1663,7 +1664,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testNoGarbageCollectionTimerForGlobalWindow(new ProcessingTimeAdaptor()); } - public void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, GlobalWindow> mockAssigner = mockGlobalWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); @@ -1767,7 +1768,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testGarbageCollectionTimer(new ProcessingTimeAdaptor()); } - public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -1812,7 +1813,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testTriggerTimerAndGarbageCollectionTimerCoincide(new ProcessingTimeAdaptor()); } - public void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testTriggerTimerAndGarbageCollectionTimerCoincide(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testStateAndTimerCleanupAtEventTimeGarbageCollection(new ProcessingTimeAdaptor()); } - public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -1938,7 +1939,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { * Verify that we correctly clean up even when a purging trigger has purged * window state. */ - public void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockTimeWindowAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -2009,7 +2010,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { * Verify that we correctly clean up even when a purging trigger has purged * window state. */ - public void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { + private void testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -2075,7 +2076,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { testMergingWindowSetClearedAtGarbageCollection(new ProcessingTimeAdaptor()); } - public void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception { + private void testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) throws Exception { WindowAssigner<Integer, TimeWindow> mockAssigner = mockMergingAssigner(); timeAdaptor.setIsEventTime(mockAssigner); Trigger<Integer, TimeWindow> mockTrigger = mockTrigger(); @@ -2120,12 +2121,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { assertEquals(0, testHarness.getOutput().size()); assertEquals(0, testHarness.numKeyedStateEntries()); - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + shouldFireOnElement(mockTrigger); // 20 is just at the limit, window.maxTime() is 1 and allowed lateness is 20 testHarness.processWatermark(new Watermark(20)); @@ -2159,12 +2155,7 @@ public abstract class WindowOperatorContractTest extends TestLogger { assertEquals(0, testHarness.getOutput().size()); assertEquals(0, testHarness.numKeyedStateEntries()); - doAnswer(new Answer<TriggerResult>() { - @Override - public TriggerResult answer(InvocationOnMock invocation) throws Exception { - return TriggerResult.FIRE; - } - }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), anyLong(), anyTimeWindow(), anyTriggerContext()); + shouldFireOnElement(mockTrigger); // window.maxTime() == 1 plus 20L of allowed lateness testHarness.processWatermark(new Watermark(21));