Add test for ReduceFnRunner GC time overflow
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/5bf732cd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/5bf732cd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/5bf732cd Branch: refs/heads/python-sdk Commit: 5bf732cd3e598321a5c51e1239eda0fe2877a65d Parents: 6058330 Author: Kenneth Knowles <k...@google.com> Authored: Tue Jun 14 16:04:10 2016 -0700 Committer: Davor Bonaci <da...@google.com> Committed: Mon Jun 20 15:14:29 2016 -0700 ---------------------------------------------------------------------- .../org/apache/beam/sdk/WindowMatchers.java | 5 ++ .../beam/sdk/util/ReduceFnRunnerTest.java | 68 ++++++++++++++++++++ 2 files changed, 73 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java index b47c32c..7a5e2fb 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/WindowMatchers.java @@ -47,6 +47,11 @@ public class WindowMatchers { return new WindowedValueMatcher<>(valueMatcher, timestampMatcher, Matchers.anything()); } + public static <T> Matcher<WindowedValue<? extends T>> isWindowedValue( + Matcher<? super T> valueMatcher) { + return new WindowedValueMatcher<>(valueMatcher, Matchers.anything(), Matchers.anything()); + } + public static <T> Matcher<WindowedValue<? extends T>> isSingleWindowedValue( T value, long timestamp, long windowStart, long windowEnd) { return WindowMatchers.<T>isSingleWindowedValue( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/5bf732cd/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index 0df4bc6..b7ec540 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.util; import static org.apache.beam.sdk.WindowMatchers.isSingleWindowedValue; +import static org.apache.beam.sdk.WindowMatchers.isWindowedValue; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; @@ -51,6 +52,7 @@ import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Never; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; @@ -59,6 +61,7 @@ import org.apache.beam.sdk.transforms.windowing.Sessions; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; +import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; @@ -79,6 +82,7 @@ import org.mockito.MockitoAnnotations; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; +import java.util.Collection; import java.util.Iterator; import java.util.List; @@ -226,6 +230,70 @@ public class ReduceFnRunnerTest { tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); } + /** + * Tests that the garbage collection time for a fixed window does not overflow the end of time. + */ + @Test + public void testFixedWindowEndOfTimeGarbageCollection() throws Exception { + + Duration allowedLateness = Duration.standardDays(365); + Duration windowSize = Duration.millis(10); + WindowFn<Object, IntervalWindow> windowFn = FixedWindows.of(windowSize); + + // This timestamp falls into a window where the end of the window is before the end of the + // global window - the "end of time" - yet its expiration time is after. + final Instant elementTimestamp = + GlobalWindow.INSTANCE.maxTimestamp().minus(allowedLateness).plus(1); + + IntervalWindow window = Iterables.getOnlyElement( + windowFn.assignWindows( + windowFn.new AssignContext() { + @Override + public Object element() { + throw new UnsupportedOperationException(); + } + @Override + public Instant timestamp() { + return elementTimestamp; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new UnsupportedOperationException(); + } + })); + + assertTrue( + window.maxTimestamp().isBefore(GlobalWindow.INSTANCE.maxTimestamp())); + assertTrue( + window.maxTimestamp().plus(allowedLateness).isAfter(GlobalWindow.INSTANCE.maxTimestamp())); + + // Test basic execution of a trigger using a non-combining window set and accumulating mode. + ReduceFnTester<Integer, Integer, IntervalWindow> tester = + ReduceFnTester.combining( + windowFn, + AfterWatermark.pastEndOfWindow().withLateFirings(Never.ever()).buildTrigger(), + AccumulationMode.DISCARDING_FIRED_PANES, + new Sum.SumIntegerFn().<String>asKeyedFn(), + VarIntCoder.of(), + allowedLateness); + + tester.injectElements(TimestampedValue.of(13, elementTimestamp)); + + // Should fire ON_TIME pane and there will be a checkState that the cleanup time + // is prior to timestamp max value + tester.advanceInputWatermark(window.maxTimestamp()); + + // Nothing in the ON_TIME pane (not governed by triggers, but by ReduceFnRunner) + assertThat(tester.extractOutput(), emptyIterable()); + + tester.injectElements(TimestampedValue.of(42, elementTimestamp)); + + // Now the final pane should fire, demonstrating that the GC time was truncated + tester.advanceInputWatermark(GlobalWindow.INSTANCE.maxTimestamp()); + assertThat(tester.extractOutput(), contains(isWindowedValue(equalTo(55)))); + } + @Test public void testOnElementCombiningAccumulating() throws Exception { // Test basic execution of a trigger using a non-combining window set and accumulating mode.