Preserves compressed windows in PushbackSideInputDoFnRunner

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c

Branch: refs/heads/master
Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d
Parents: 565e99f
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Wed Nov 30 14:28:51 2016 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Nov 30 16:26:33 2016 -0800

----------------------------------------------------------------------
 .../core/PushbackSideInputDoFnRunner.java       | 20 ++++++++++++++++----
 .../core/PushbackSideInputDoFnRunnerTest.java   | 18 +++++++++++-------
 2 files changed, 27 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 8c169da..460154d 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> 
implements DoFnRunner<
       processElement(elem);
       return Collections.emptyList();
     }
-    ImmutableList.Builder<WindowedValue<InputT>> pushedBack = 
ImmutableList.builder();
+    ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = 
ImmutableList.builder();
+    ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = 
ImmutableList.builder();
     for (WindowedValue<InputT> windowElem : elem.explodeWindows()) {
       BoundedWindow mainInputWindow = 
Iterables.getOnlyElement(windowElem.getWindows());
       if (isReady(mainInputWindow)) {
-        processElement(windowElem);
+        readyWindowsBuilder.add(mainInputWindow);
       } else {
         notReadyWindows.add(mainInputWindow);
-        pushedBack.add(windowElem);
+        pushedBackWindowsBuilder.add(mainInputWindow);
       }
     }
-    return pushedBack.build();
+    ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build();
+    ImmutableList<BoundedWindow> pushedBackWindows = 
pushedBackWindowsBuilder.build();
+    if (!readyWindows.isEmpty()) {
+      processElement(
+          WindowedValue.of(
+              elem.getValue(), elem.getTimestamp(), readyWindows, 
elem.getPane()));
+    }
+    return pushedBackWindows.isEmpty()
+        ? ImmutableList.<WindowedValue<InputT>>of()
+        : ImmutableList.of(
+            WindowedValue.of(
+                elem.getValue(), elem.getTimestamp(), pushedBackWindows, 
elem.getPane()));
   }
 
   private boolean isReady(BoundedWindow mainInputWindow) {

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
index 59a7c92..f8f4604 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java
@@ -17,9 +17,9 @@
  */
 package org.apache.beam.runners.core;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.emptyIterable;
-import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -27,7 +27,6 @@ import static org.mockito.Mockito.when;
 import com.google.common.collect.ImmutableList;
 import java.util.ArrayList;
 import java.util.List;
-
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.transforms.Create;
 import org.apache.beam.sdk.transforms.Sum;
@@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest {
             PaneInfo.ON_TIME_AND_ONLY_FIRING);
     Iterable<WindowedValue<Integer>> multiWindowPushback =
         runner.processElementInReadyWindows(multiWindow);
-    assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows()));
+    assertThat(multiWindowPushback, contains(multiWindow));
     assertThat(underlying.inputElems, 
Matchers.<WindowedValue<Integer>>emptyIterable());
   }
 
@@ -162,9 +161,14 @@ public class PushbackSideInputDoFnRunnerTest {
     assertThat(
         multiWindowPushback,
         containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new 
Instant(-2L))));
-    assertThat(underlying.inputElems,
-        containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, 
PaneInfo.NO_FIRING),
-            WindowedValue.of(2, new Instant(-2), bigWindow, 
PaneInfo.NO_FIRING)));
+    assertThat(
+        underlying.inputElems,
+        containsInAnyOrder(
+            WindowedValue.of(
+                2,
+                new Instant(-2),
+                ImmutableList.of(littleWindow, bigWindow),
+                PaneInfo.NO_FIRING)));
   }
 
   @Test
@@ -188,7 +192,7 @@ public class PushbackSideInputDoFnRunnerTest {
         runner.processElementInReadyWindows(multiWindow);
     assertThat(multiWindowPushback, emptyIterable());
     assertThat(underlying.inputElems,
-        
containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray()));
+        containsInAnyOrder(ImmutableList.of(multiWindow).toArray()));
   }
 
   @Test

Reply via email to