Repository: incubator-beam
Updated Branches:
  refs/heads/master b38c9e9eb -> 3c4b6930e


[BEAM-1154] Get side input from proper window in ReduceFn


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/de109d5b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/de109d5b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/de109d5b

Branch: refs/heads/master
Commit: de109d5b4c7693e935b68233c32e70f3f6b3d513
Parents: 0bdf7fc
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Wed Dec 14 14:29:30 2016 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Wed Dec 14 15:34:12 2016 -0800

----------------------------------------------------------------------
 .../runners/core/ReduceFnContextFactory.java    |  16 +--
 .../beam/runners/core/ReduceFnRunnerTest.java   | 133 ++++++++++---------
 .../beam/sdk/transforms/CombineWithContext.java |   2 +-
 3 files changed, 78 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index c5bda9b..c71897d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -28,7 +28,6 @@ import org.apache.beam.sdk.coders.Coder;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
-import org.apache.beam.sdk.transforms.windowing.WindowFn;
 import org.apache.beam.sdk.util.ActiveWindowSet;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
@@ -98,11 +97,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
         activeWindows,
         windowingStrategy.getWindowFn().windowCoder(),
         stateInternals,
-        stateContextFromComponents(
-            options,
-            sideInputReader,
-            window,
-            windowingStrategy.getWindowFn()),
+        stateContextFromComponents(options, sideInputReader, window),
         style);
   }
 
@@ -512,8 +507,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
   private static <W extends BoundedWindow> StateContext<W> 
stateContextFromComponents(
       @Nullable final PipelineOptions options,
       final SideInputReader sideInputReader,
-      final W mainInputWindow,
-      final WindowFn<?, W> windowFn) {
+      final W mainInputWindow) {
     if (options == null) {
       return StateContexts.nullContext();
     } else {
@@ -526,7 +520,11 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
 
         @Override
         public <T> T sideInput(PCollectionView<T> view) {
-          return sideInputReader.get(view, 
windowFn.getSideInputWindow(mainInputWindow));
+          return sideInputReader.get(
+              view,
+              view.getWindowingStrategyInternal()
+                  .getWindowFn()
+                  .getSideInputWindow(mainInputWindow));
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index ba57567..4abfc9a 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -17,13 +17,14 @@
  */
 package org.apache.beam.runners.core;
 
-import static com.google.common.base.Preconditions.checkArgument;
 import static 
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
 import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
+import static org.hamcrest.Matchers.anyOf;
 import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertThat;
@@ -36,7 +37,6 @@ import static org.mockito.Mockito.when;
 import static org.mockito.Mockito.withSettings;
 
 import com.google.common.collect.Iterables;
-import java.util.Iterator;
 import java.util.List;
 import org.apache.beam.runners.core.triggers.TriggerStateMachine;
 import org.apache.beam.sdk.coders.VarIntCoder;
@@ -348,49 +348,67 @@ public class ReduceFnRunnerTest {
 
   @Test
   public void testOnElementCombiningWithContext() throws Exception {
-    Integer expectedValue = 5;
-    WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy
-        .of(FixedWindows.of(Duration.millis(10)))
-        .withMode(AccumulationMode.DISCARDING_FIRED_PANES)
-        .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())
-        .withAllowedLateness(Duration.millis(100));
+    // Create values at timestamps 0 .. 8, windowed into fixed windows of 2.
+    // Side input windowed into fixed windows of 4:
+    // main: [ 0 1 ] [ 2 3 ] [ 4 5 ] [ 6 7 ]
+    // side: [     100     ] [     104     ]
+    // Combine using a CombineFn "side input + sum(main inputs)".
+    final int firstWindowSideInput = 100;
+    final int secondWindowSideInput = 104;
+    final Integer expectedValue = firstWindowSideInput;
+    WindowingStrategy<?, IntervalWindow> mainInputWindowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(2)))
+            .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
+
+    WindowingStrategy<?, IntervalWindow> sideInputWindowingStrategy =
+        WindowingStrategy.of(FixedWindows.of(Duration.millis(4)));
 
     TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
-    options.setValue(5);
+    options.setValue(expectedValue);
 
     
when(mockSideInputReader.contains(Matchers.<PCollectionView<Integer>>any())).thenReturn(true);
     when(mockSideInputReader.get(
-        Matchers.<PCollectionView<Integer>>any(), 
any(BoundedWindow.class))).thenReturn(5);
+            Matchers.<PCollectionView<Integer>>any(), 
any(BoundedWindow.class)))
+        .then(
+            new Answer<Integer>() {
+              @Override
+              public Integer answer(InvocationOnMock invocation) throws 
Throwable {
+                IntervalWindow sideInputWindow = (IntervalWindow) 
invocation.getArguments()[1];
+                long startMs = sideInputWindow.start().getMillis();
+                long endMs = sideInputWindow.end().getMillis();
+                // Window should have been produced by 
sideInputWindowingStrategy.
+                assertThat(startMs, anyOf(equalTo(0L), equalTo(4L)));
+                assertThat(endMs - startMs, equalTo(4L));
+                // If startMs == 4 (second window), equal to 
secondWindowSideInput.
+                return firstWindowSideInput + (int) startMs;
+              }
+            });
 
     @SuppressWarnings({"rawtypes", "unchecked", "unused"})
     Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal())
-        .thenReturn((WindowingStrategy) windowingStrategy);
+        .thenReturn((WindowingStrategy) sideInputWindowingStrategy);
 
     SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, 
expectedValue);
-    // Test basic execution of a trigger using a non-combining window set and 
discarding mode.
     ReduceFnTester<Integer, Integer, IntervalWindow> tester = 
ReduceFnTester.combining(
-        windowingStrategy, mockTriggerStateMachine, 
combineFn.<String>asKeyedFn(),
+        mainInputWindowingStrategy, mockTriggerStateMachine, 
combineFn.<String>asKeyedFn(),
         VarIntCoder.of(), options, mockSideInputReader);
 
-    injectElement(tester, 2);
-
-    
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
-    injectElement(tester, 3);
-
     
when(mockTriggerStateMachine.shouldFire(anyTriggerContext())).thenReturn(true);
-    triggerShouldFinish(mockTriggerStateMachine);
-    injectElement(tester, 4);
-
-    // This element shouldn't be seen, because the trigger has finished
-    injectElement(tester, 6);
+    for (int i = 0; i < 8; ++i) {
+      injectElement(tester, i);
+    }
 
     assertThat(
         tester.extractOutput(),
         contains(
-            isSingleWindowedValue(equalTo(5), 2, 0, 10),
-            isSingleWindowedValue(equalTo(4), 4, 0, 10)));
-    assertTrue(tester.isMarkedFinished(firstWindow));
-    tester.assertHasOnlyGlobalAndFinishedSetsFor(firstWindow);
+            isSingleWindowedValue(equalTo(0 + firstWindowSideInput), 1, 0, 2),
+            isSingleWindowedValue(equalTo(0 + 1 + firstWindowSideInput), 1, 0, 
2),
+            isSingleWindowedValue(equalTo(2 + firstWindowSideInput), 3, 2, 4),
+            isSingleWindowedValue(equalTo(2 + 3 + firstWindowSideInput), 3, 2, 
4),
+            isSingleWindowedValue(equalTo(4 + secondWindowSideInput), 5, 4, 6),
+            isSingleWindowedValue(equalTo(4 + 5 + secondWindowSideInput), 5, 
4, 6),
+            isSingleWindowedValue(equalTo(6 + secondWindowSideInput), 7, 6, 8),
+            isSingleWindowedValue(equalTo(6 + 7 + secondWindowSideInput), 7, 
6, 8)));
   }
 
   @Test
@@ -1424,7 +1442,8 @@ public class ReduceFnRunnerTest {
     assertEquals(2, output.size());
   }
 
-  private static class SumAndVerifyContextFn extends 
CombineFnWithContext<Integer, int[], Integer> {
+  private static class SumAndVerifyContextFn
+      extends CombineFnWithContext<Integer, Integer, Integer> {
 
     private final PCollectionView<Integer> view;
     private final int expectedValue;
@@ -1433,50 +1452,38 @@ public class ReduceFnRunnerTest {
       this.view = view;
       this.expectedValue = expectedValue;
     }
-    @Override
-    public int[] createAccumulator(Context c) {
-      checkArgument(
-          c.getPipelineOptions().as(TestOptions.class).getValue() == 
expectedValue);
-      checkArgument(c.sideInput(view) == expectedValue);
-      return wrap(0);
+
+    private void verifyContext(Context c) {
+      assertThat(expectedValue, 
equalTo(c.getPipelineOptions().as(TestOptions.class).getValue()));
+      assertThat(c.sideInput(view), greaterThanOrEqualTo(100));
     }
 
     @Override
-    public int[] addInput(int[] accumulator, Integer input, Context c) {
-      checkArgument(
-          c.getPipelineOptions().as(TestOptions.class).getValue() == 
expectedValue);
-      checkArgument(c.sideInput(view) == expectedValue);
-      accumulator[0] += input.intValue();
-      return accumulator;
+    public Integer createAccumulator(Context c) {
+      verifyContext(c);
+      return 0;
     }
 
     @Override
-    public int[] mergeAccumulators(Iterable<int[]> accumulators, Context c) {
-      checkArgument(
-          c.getPipelineOptions().as(TestOptions.class).getValue() == 
expectedValue);
-      checkArgument(c.sideInput(view) == expectedValue);
-      Iterator<int[]> iter = accumulators.iterator();
-      if (!iter.hasNext()) {
-        return createAccumulator(c);
-      } else {
-        int[] running = iter.next();
-        while (iter.hasNext()) {
-          running[0] += iter.next()[0];
-        }
-        return running;
-      }
+    public Integer addInput(Integer accumulator, Integer input, Context c) {
+      verifyContext(c);
+      return accumulator + input;
     }
 
     @Override
-    public Integer extractOutput(int[] accumulator, Context c) {
-      checkArgument(
-          c.getPipelineOptions().as(TestOptions.class).getValue() == 
expectedValue);
-      checkArgument(c.sideInput(view) == expectedValue);
-      return accumulator[0];
+    public Integer mergeAccumulators(Iterable<Integer> accumulators, Context 
c) {
+      verifyContext(c);
+      int res = 0;
+      for (Integer accum : accumulators) {
+        res += accum;
+      }
+      return res;
     }
 
-    private int[] wrap(int value) {
-      return new int[] { value };
+    @Override
+    public Integer extractOutput(Integer accumulator, Context c) {
+      verifyContext(c);
+      return accumulator + c.sideInput(view);
     }
   }
 
@@ -1484,7 +1491,7 @@ public class ReduceFnRunnerTest {
    * A {@link PipelineOptions} to test combining with context.
    */
   public interface TestOptions extends PipelineOptions {
-    Integer getValue();
-    void setValue(Integer value);
+    int getValue();
+    void setValue(int value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/de109d5b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
index 7ac952c..cd0600a 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/CombineWithContext.java
@@ -48,7 +48,7 @@ public class CombineWithContext {
 
     /**
      * Returns the value of the side input for the window corresponding to the
-     * window of the main input element.
+     * main input's window in which values are being combined.
      */
     public abstract <T> T sideInput(PCollectionView<T> view);
   }

Reply via email to