[hotfix] Fix various small issues in WindowOperatorContractTest

Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/25d52e4d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/25d52e4d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/25d52e4d

Branch: refs/heads/table-retraction
Commit: 25d52e4df216dc54d2d82e1f0b449871bda4ba74
Parents: 3c4b156
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Wed Mar 22 17:02:15 2017 +0100
Committer: Tzu-Li (Gordon) Tai <tzuli...@apache.org>
Committed: Thu Mar 23 23:29:01 2017 +0800

----------------------------------------------------------------------
 .../windowing/WindowOperatorContractTest.java   | 97 +++++++++-----------
 1 file changed, 44 insertions(+), 53 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/25d52e4d/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
----------------------------------------------------------------------
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
index abc7b3e..aaea8b1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperatorContractTest.java
@@ -19,6 +19,7 @@ package 
org.apache.flink.streaming.runtime.operators.windowing;
 
 
 import static 
org.apache.flink.streaming.runtime.operators.windowing.StreamRecordMatchers.isStreamRecord;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertThat;
@@ -36,7 +37,7 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import com.google.common.collect.Lists;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
 import java.util.Collections;
@@ -155,7 +156,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
        @SuppressWarnings("unchecked")
        static Iterable<Integer> intIterable(Integer... values) {
-               return (Iterable<Integer>) argThat(containsInAnyOrder(values));
+               return (Iterable<Integer>) argThat(contains(values));
        }
 
        static TimeWindow anyTimeWindow() {
@@ -247,55 +248,55 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
        }
 
        private static <T> void shouldContinueOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
-               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
        }
 
        private static <T> void shouldFireOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
-               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE);
        }
 
        private static <T> void shouldPurgeOnElement(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
-               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.PURGE);
        }
 
        private static <T> void shouldFireAndPurgeOnElement(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               when(mockTrigger.onElement(Matchers.<T>anyObject(), anyLong(), 
anyTimeWindow(), anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
        }
 
        private static <T> void shouldContinueOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
        }
 
        private static <T> void shouldFireOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
-               when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
        }
 
        private static <T> void shouldPurgeOnEventTime(Trigger<T, TimeWindow> 
mockTrigger) throws Exception {
-               when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
        }
 
        private static <T> void shouldFireAndPurgeOnEventTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onEventTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               when(mockTrigger.onEventTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
        }
 
        private static <T> void shouldContinueOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.CONTINUE);
        }
 
        private static <T> void shouldFireOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE);
        }
 
        private static <T> void shouldPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.PURGE);
        }
 
        private static <T> void shouldFireAndPurgeOnProcessingTime(Trigger<T, 
TimeWindow> mockTrigger) throws Exception {
-               when(mockTrigger.onProcessingTime(anyLong(), 
Matchers.<TimeWindow>any(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
+               when(mockTrigger.onProcessingTime(anyLong(), anyTimeWindow(), 
anyTriggerContext())).thenReturn(TriggerResult.FIRE_AND_PURGE);
        }
 
        /**
-        * Verify that there is no late-date side output if the {@code 
WindowAssigner} does
+        * Verify that there is no late-data side output if the {@code 
WindowAssigner} does
         * not assign any windows.
         */
        @Test
@@ -346,7 +347,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                verify(mockAssigner, times(1)).assignWindows(eq(0), eq(5L), 
anyAssignerContext());
 
                assertThat(testHarness.getSideOutput(lateOutputTag),
-                               containsInAnyOrder(isStreamRecord(0, 5L)));
+                               contains(isStreamRecord(0, 5L)));
 
                // we should also see side output if the WindowAssigner assigns 
no windows
                when(mockAssigner.assignWindows(anyInt(), anyLong(), 
anyAssignerContext()))
@@ -358,7 +359,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                verify(mockAssigner, times(1)).assignWindows(eq(0), eq(10L), 
anyAssignerContext());
 
                assertThat(testHarness.getSideOutput(lateOutputTag),
-                               containsInAnyOrder(isStreamRecord(0, 5L), 
isStreamRecord(0, 10L)));
+                               contains(isStreamRecord(0, 5L), 
isStreamRecord(0, 10L)));
 
        }
 
@@ -520,7 +521,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                assertThat(testHarness.extractOutputStreamRecords(),
-                               containsInAnyOrder(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
+                               contains(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
        }
 
        @Test
@@ -534,7 +535,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
        }
 
 
-       private  void testEmittingFromWindowFunction(TimeDomainAdaptor 
timeAdaptor) throws Exception {
+       private void testEmittingFromWindowFunction(TimeDomainAdaptor 
timeAdaptor) throws Exception {
 
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -573,7 +574,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                verify(mockWindowFunction, times(1)).apply(eq(0), eq(new 
TimeWindow(0, 2)), intIterable(0), 
WindowOperatorContractTest.<String>anyCollector());
 
                assertThat(testHarness.extractOutputStreamRecords(),
-                               containsInAnyOrder(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
+                               contains(isStreamRecord("Hallo", 1L), 
isStreamRecord("Ciao", 1L)));
        }
 
        @Test
@@ -1067,9 +1068,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
        /**
         * Verify that we neither invoke the trigger nor the window function if 
a timer
-        * for an empty merging window.
+        * for an empty merging window fires.
         */
-       public void testNoTimerFiringForPurgedMergingWindow(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
+       private void testNoTimerFiringForPurgedMergingWindow(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
 
                MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1133,7 +1134,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
         * Verify that we neither invoke the trigger nor the window function if 
a timer
         * fires for a merging window that was already garbage collected.
         */
-       public void testNoTimerFiringForGarbageCollectedMergingWindow(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
+       private void testNoTimerFiringForGarbageCollectedMergingWindow(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
 
                MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1166,7 +1167,6 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                        }
                }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
 
-
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
                assertEquals(2, testHarness.numKeyedStateEntries()); // window 
contents and merging window set
@@ -1311,7 +1311,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testDeletedTimerDoesNotFire(new ProcessingTimeAdaptor());
        }
 
-       public void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) 
throws Exception {
+       private void testDeletedTimerDoesNotFire(TimeDomainAdaptor timeAdaptor) 
throws Exception {
 
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1372,8 +1372,9 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
 
-               verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new 
TimeWindow(2, 4))), anyMergeCallback());
-               verify(mockAssigner).mergeWindows(eq(Lists.newArrayList(new 
TimeWindow(2, 4), new TimeWindow(0, 2))), anyMergeCallback());
+               
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new 
TimeWindow(2, 4))), anyMergeCallback());
+               
verify(mockAssigner).mergeWindows(eq(Collections.singletonList(new 
TimeWindow(2, 4))), anyMergeCallback());
+
                verify(mockAssigner, times(2)).mergeWindows(anyCollection(), 
anyMergeCallback());
 
 
@@ -1392,7 +1393,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
        /**
         * Verify that windows are merged eagerly, if possible.
         */
-       public void testWindowsAreMergedEagerly(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
+       private void testWindowsAreMergedEagerly(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
                // in this test we only have one state window and windows are 
eagerly
                // merged into the first window
 
@@ -1456,8 +1457,8 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                shouldMergeWindows(
                                mockAssigner,
-                               Lists.newArrayList(new TimeWindow(0, 2), new 
TimeWindow(2, 4)),
-                               Lists.newArrayList(new TimeWindow(0, 2), new 
TimeWindow(2, 4)),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
2), new TimeWindow(2, 4))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
2), new TimeWindow(2, 4))),
                                new TimeWindow(0, 4));
 
                // don't register a timer or update state in onElement, this 
checks
@@ -1491,7 +1492,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
         * Verify that we only keep one of the underlying state windows. This 
test also verifies that
         * GC timers are correctly deleted when merging windows.
         */
-       public void testMergingOfExistingWindows(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
+       private void testMergingOfExistingWindows(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
 
                MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1562,8 +1563,8 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
 
                shouldMergeWindows(
                                mockAssigner,
-                               Lists.newArrayList(new TimeWindow(0, 2), new 
TimeWindow(2, 4), new TimeWindow(1, 3)),
-                               Lists.newArrayList(new TimeWindow(0, 2), new 
TimeWindow(2, 4), new TimeWindow(1, 3)),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
+                               new ArrayList<>(Arrays.asList(new TimeWindow(0, 
2), new TimeWindow(2, 4), new TimeWindow(1, 3))),
                                new TimeWindow(0, 4));
 
                testHarness.processElement(new StreamRecord<>(0, 0L));
@@ -1618,7 +1619,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testOnTimePurgeDoesNotCleanupMergingSet(new 
ProcessingTimeAdaptor());
        }
 
-       public void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor 
timeAdaptor) throws Exception {
+       private void testOnTimePurgeDoesNotCleanupMergingSet(TimeDomainAdaptor 
timeAdaptor) throws Exception {
 
                MergingWindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1663,7 +1664,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testNoGarbageCollectionTimerForGlobalWindow(new 
ProcessingTimeAdaptor());
        }
 
-       public void 
testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) 
throws Exception {
+       private void 
testNoGarbageCollectionTimerForGlobalWindow(TimeDomainAdaptor timeAdaptor) 
throws Exception {
 
                WindowAssigner<Integer, GlobalWindow> mockAssigner = 
mockGlobalWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
@@ -1767,7 +1768,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testGarbageCollectionTimer(new ProcessingTimeAdaptor());
        }
 
-       public void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) 
throws Exception {
+       private void testGarbageCollectionTimer(TimeDomainAdaptor timeAdaptor) 
throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1812,7 +1813,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testTriggerTimerAndGarbageCollectionTimerCoincide(new 
ProcessingTimeAdaptor());
        }
 
-       public void testTriggerTimerAndGarbageCollectionTimerCoincide(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
+       private void testTriggerTimerAndGarbageCollectionTimerCoincide(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1868,7 +1869,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testStateAndTimerCleanupAtEventTimeGarbageCollection(new 
ProcessingTimeAdaptor());
        }
 
-       public void testStateAndTimerCleanupAtEventTimeGarbageCollection(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
+       private void testStateAndTimerCleanupAtEventTimeGarbageCollection(final 
TimeDomainAdaptor timeAdaptor) throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -1938,7 +1939,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
         * Verify that we correctly clean up even when a purging trigger has 
purged
         * window state.
         */
-       public void 
testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
+       private void 
testStateAndTimerCleanupAtEventTimeGCWithPurgingTrigger(final TimeDomainAdaptor 
timeAdaptor) throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockTimeWindowAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2009,7 +2010,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
         * Verify that we correctly clean up even when a purging trigger has 
purged
         * window state.
         */
-       public void 
testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final
 TimeDomainAdaptor timeAdaptor) throws Exception {
+       private void 
testStateAndTimerCleanupAtGarbageCollectionWithPurgingTriggerAndMergingWindows(final
 TimeDomainAdaptor timeAdaptor) throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2075,7 +2076,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                testMergingWindowSetClearedAtGarbageCollection(new 
ProcessingTimeAdaptor());
        }
 
-       public void 
testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) 
throws Exception {
+       private void 
testMergingWindowSetClearedAtGarbageCollection(TimeDomainAdaptor timeAdaptor) 
throws Exception {
                WindowAssigner<Integer, TimeWindow> mockAssigner = 
mockMergingAssigner();
                timeAdaptor.setIsEventTime(mockAssigner);
                Trigger<Integer, TimeWindow> mockTrigger = mockTrigger();
@@ -2120,12 +2121,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(0, testHarness.getOutput().size());
                assertEquals(0, testHarness.numKeyedStateEntries());
 
-               doAnswer(new Answer<TriggerResult>() {
-                       @Override
-                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
-                               return TriggerResult.FIRE;
-                       }
-               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+               shouldFireOnElement(mockTrigger);
 
                // 20 is just at the limit, window.maxTime() is 1 and allowed 
lateness is 20
                testHarness.processWatermark(new Watermark(20));
@@ -2159,12 +2155,7 @@ public abstract class WindowOperatorContractTest extends 
TestLogger {
                assertEquals(0, testHarness.getOutput().size());
                assertEquals(0, testHarness.numKeyedStateEntries());
 
-               doAnswer(new Answer<TriggerResult>() {
-                       @Override
-                       public TriggerResult answer(InvocationOnMock 
invocation) throws Exception {
-                               return TriggerResult.FIRE;
-                       }
-               }).when(mockTrigger).onElement(Matchers.<Integer>anyObject(), 
anyLong(), anyTimeWindow(), anyTriggerContext());
+               shouldFireOnElement(mockTrigger);
 
                // window.maxTime() == 1 plus 20L of allowed lateness
                testHarness.processWatermark(new Watermark(21));

Reply via email to