Repository: incubator-beam Updated Branches: refs/heads/master b38c9e9eb -> 3c4b6930e
[BEAM-1154] Get side input from proper window in ReduceFn Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de109d5b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de109d5b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de109d5b Branch: refs/heads/master Commit: de109d5b4c7693e935b68233c32e70f3f6b3d513 Parents: 0bdf7fc Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Dec 14 14:29:30 2016 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Wed Dec 14 15:34:12 2016 -0800 ---------------------------------------------------------------------- .../runners/core/ReduceFnContextFactory.java | 16 +-- .../beam/runners/core/ReduceFnRunnerTest.java | 133 ++++++++++--------- .../beam/sdk/transforms/CombineWithContext.java | 2 +- 3 files changed, 78 insertions(+), 73 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index c5bda9b..c71897d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.ActiveWindowSet; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; @@ -98,11 +97,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { activeWindows, windowingStrategy.getWindowFn().windowCoder(), stateInternals, - stateContextFromComponents( - options, - sideInputReader, - window, - windowingStrategy.getWindowFn()), + stateContextFromComponents(options, sideInputReader, window), style); } @@ -512,8 +507,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { private static <W extends BoundedWindow> StateContext<W> stateContextFromComponents( @Nullable final PipelineOptions options, final SideInputReader sideInputReader, - final W mainInputWindow, - final WindowFn<?, W> windowFn) { + final W mainInputWindow) { if (options == null) { return StateContexts.nullContext(); } else { @@ -526,7 +520,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { @Override public <T> T sideInput(PCollectionView<T> view) { - return sideInputReader.get(view, windowFn.getSideInputWindow(mainInputWindow)); + return sideInputReader.get( + view, + view.getWindowingStrategyInternal() + .getWindowFn() + .getSideInputWindow(mainInputWindow)); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index ba57567..4abfc9a 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -17,13 +17,14 @@ */ package org.apache.beam.runners.core; -import static com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertThat; @@ -36,7 +37,6 @@ import static org.mockito.Mockito.when; import static org.mockito.Mockito.withSettings; import com.google.common.collect.Iterables; -import java.util.Iterator; import java.util.List; import org.apache.beam.runners.core.triggers.TriggerStateMachine; import org.apache.beam.sdk.coders.VarIntCoder; @@ -348,49 +348,67 @@ public class ReduceFnRunnerTest { @Test public void testOnElementCombiningWithContext() throws Exception { - Integer expectedValue = 5; - WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy - .of(FixedWindows.of(Duration.millis(10))) - .withMode(AccumulationMode.DISCARDING_FIRED_PANES) - .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) - .withAllowedLateness(Duration.millis(100)); + // Create values at timestamps 0 .. 8, windowed into fixed windows of 2. + // Side input windowed into fixed windows of 4: + // main: [ 0 1 ] [ 2 3 ] [ 4 5 ] [ 6 7 ] + // side: [ 100 ] [ 104 ] + // Combine using a CombineFn "side input + sum(main inputs)". + final int firstWindowSideInput = 100; + final int secondWindowSideInput = 104; + final Integer expectedValue = firstWindowSideInput; + WindowingStrategy<?, IntervalWindow> mainInputWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(2))) + .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); + + WindowingStrategy<?, IntervalWindow> sideInputWindowingStrategy = + WindowingStrategy.of(FixedWindows.of(Duration.millis(4))); TestOptions options = PipelineOptionsFactory.as(TestOptions.class); - options.setValue(5); + options.setValue(expectedValue); when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true); when(mockSideInputReader.get( - Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class))).thenReturn(5); + Matchers.<PCollectionView<Integer>>any(), any(BoundedWindow.class))) + .then( + new Answer<Integer>() { + @Override + public Integer answer(InvocationOnMock invocation) throws Throwable { + IntervalWindow sideInputWindow = (IntervalWindow) invocation.getArguments()[1]; + long startMs = sideInputWindow.start().getMillis(); + long endMs = sideInputWindow.end().getMillis(); + // Window should have been produced by sideInputWindowingStrategy. + assertThat(startMs, anyOf(equalTo(0L), equalTo(4L))); + assertThat(endMs - startMs, equalTo(4L)); + // If startMs == 4 (second window), equal to secondWindowSideInput. + return firstWindowSideInput + (int) startMs; + } + }); @SuppressWarnings({"rawtypes", "unchecked", "unused"}) Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal()) - .thenReturn((WindowingStrategy) windowingStrategy); + .thenReturn((WindowingStrategy) sideInputWindowingStrategy); SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); - // Test basic execution of a trigger using a non-combining window set and discarding mode. ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( - windowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(), + mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(), VarIntCoder.of(), options, mockSideInputReader); - injectElement(tester, 2); - - when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); - injectElement(tester, 3); - when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true); - triggerShouldFinish(mockTriggerStateMachine); - injectElement(tester, 4); - - // This element shouldn't be seen, because the trigger has finished - injectElement(tester, 6); + for (int i = 0; i < 8; ++i) { + injectElement(tester, i); + } assertThat( tester.extractOutput(), contains( - isSingleWindowedValue(equalTo(5), 2, 0, 10), - isSingleWindowedValue(equalTo(4), 4, 0, 10))); - assertTrue(tester.isMarkedFinished(firstWindow)); - tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow); + isSingleWindowedValue(equalTo(0 + firstWindowSideInput), 1, 0, 2), + isSingleWindowedValue(equalTo(0 + 1 + firstWindowSideInput), 1, 0, 2), + isSingleWindowedValue(equalTo(2 + firstWindowSideInput), 3, 2, 4), + isSingleWindowedValue(equalTo(2 + 3 + firstWindowSideInput), 3, 2, 4), + isSingleWindowedValue(equalTo(4 + secondWindowSideInput), 5, 4, 6), + isSingleWindowedValue(equalTo(4 + 5 + secondWindowSideInput), 5, 4, 6), + isSingleWindowedValue(equalTo(6 + secondWindowSideInput), 7, 6, 8), + isSingleWindowedValue(equalTo(6 + 7 + secondWindowSideInput), 7, 6, 8))); } @Test @@ -1424,7 +1442,8 @@ public class ReduceFnRunnerTest { assertEquals(2, output.size()); } - private static class SumAndVerifyContextFn extends CombineFnWithContext<Integer, int[], Integer> { + private static class SumAndVerifyContextFn + extends CombineFnWithContext<Integer, Integer, Integer> { private final PCollectionView<Integer> view; private final int expectedValue; @@ -1433,50 +1452,38 @@ public class ReduceFnRunnerTest { this.view = view; this.expectedValue = expectedValue; } - @Override - public int[] createAccumulator(Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - return wrap(0); + + private void verifyContext(Context c) { + assertThat(expectedValue, equalTo(c.getPipelineOptions().as(TestOptions.class).getValue())); + assertThat(c.sideInput(view), greaterThanOrEqualTo(100)); } @Override - public int[] addInput(int[] accumulator, Integer input, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - accumulator[0] += input.intValue(); - return accumulator; + public Integer createAccumulator(Context c) { + verifyContext(c); + return 0; } @Override - public int[] mergeAccumulators(Iterable<int[]> accumulators, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - Iterator<int[]> iter = accumulators.iterator(); - if (!iter.hasNext()) { - return createAccumulator(c); - } else { - int[] running = iter.next(); - while (iter.hasNext()) { - running[0] += iter.next()[0]; - } - return running; - } + public Integer addInput(Integer accumulator, Integer input, Context c) { + verifyContext(c); + return accumulator + input; } @Override - public Integer extractOutput(int[] accumulator, Context c) { - checkArgument( - c.getPipelineOptions().as(TestOptions.class).getValue() == expectedValue); - checkArgument(c.sideInput(view) == expectedValue); - return accumulator[0]; + public Integer mergeAccumulators(Iterable<Integer> accumulators, Context c) { + verifyContext(c); + int res = 0; + for (Integer accum : accumulators) { + res += accum; + } + return res; } - private int[] wrap(int value) { - return new int[] { value }; + @Override + public Integer extractOutput(Integer accumulator, Context c) { + verifyContext(c); + return accumulator + c.sideInput(view); } } @@ -1484,7 +1491,7 @@ public class ReduceFnRunnerTest { * A {@link PipelineOptions} to test combining with context. */ public interface TestOptions extends PipelineOptions { - Integer getValue(); - void setValue(Integer value); + int getValue(); + void setValue(int value); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java index 7ac952c..cd0600a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java @@ -48,7 +48,7 @@ public class CombineWithContext { /** * Returns the value of the side input for the window corresponding to the - * window of the main input element. + * main input's window in which values are being combined. */ public abstract <T> T sideInput(PCollectionView<T> view); }