Preserves compressed windows in PushbackSideInputDoFnRunner
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/38f0b11c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/38f0b11c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/38f0b11c Branch: refs/heads/master Commit: 38f0b11cc9028cf347e3c96b6e6116e5a5a9972d Parents: 565e99f Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Nov 30 14:28:51 2016 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Wed Nov 30 16:26:33 2016 -0800 ---------------------------------------------------------------------- .../core/PushbackSideInputDoFnRunner.java | 20 ++++++++++++++++---- .../core/PushbackSideInputDoFnRunnerTest.java | 18 +++++++++++------- 2 files changed, 27 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 8c169da..460154d 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -74,17 +74,29 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< processElement(elem); return Collections.emptyList(); } - ImmutableList.Builder<WindowedValue<InputT>> pushedBack = ImmutableList.builder(); + ImmutableList.Builder<BoundedWindow> readyWindowsBuilder = ImmutableList.builder(); + ImmutableList.Builder<BoundedWindow> pushedBackWindowsBuilder = ImmutableList.builder(); for (WindowedValue<InputT> windowElem : elem.explodeWindows()) { BoundedWindow mainInputWindow = Iterables.getOnlyElement(windowElem.getWindows()); if (isReady(mainInputWindow)) { - processElement(windowElem); + readyWindowsBuilder.add(mainInputWindow); } else { notReadyWindows.add(mainInputWindow); - pushedBack.add(windowElem); + pushedBackWindowsBuilder.add(mainInputWindow); } } - return pushedBack.build(); + ImmutableList<BoundedWindow> readyWindows = readyWindowsBuilder.build(); + ImmutableList<BoundedWindow> pushedBackWindows = pushedBackWindowsBuilder.build(); + if (!readyWindows.isEmpty()) { + processElement( + WindowedValue.of( + elem.getValue(), elem.getTimestamp(), readyWindows, elem.getPane())); + } + return pushedBackWindows.isEmpty() + ? ImmutableList.<WindowedValue<InputT>>of() + : ImmutableList.of( + WindowedValue.of( + elem.getValue(), elem.getTimestamp(), pushedBackWindows, elem.getPane())); } private boolean isReady(BoundedWindow mainInputWindow) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/38f0b11c/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java index 59a7c92..f8f4604 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunnerTest.java @@ -17,9 +17,9 @@ */ package org.apache.beam.runners.core; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.emptyIterable; -import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -27,7 +27,6 @@ import static org.mockito.Mockito.when; import com.google.common.collect.ImmutableList; import java.util.ArrayList; import java.util.List; - import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; @@ -131,7 +130,7 @@ public class PushbackSideInputDoFnRunnerTest { PaneInfo.ON_TIME_AND_ONLY_FIRING); Iterable<WindowedValue<Integer>> multiWindowPushback = runner.processElementInReadyWindows(multiWindow); - assertThat(multiWindowPushback, equalTo(multiWindow.explodeWindows())); + assertThat(multiWindowPushback, contains(multiWindow)); assertThat(underlying.inputElems, Matchers.<WindowedValue<Integer>>emptyIterable()); } @@ -162,9 +161,14 @@ public class PushbackSideInputDoFnRunnerTest { assertThat( multiWindowPushback, containsInAnyOrder(WindowedValue.timestampedValueInGlobalWindow(2, new Instant(-2L)))); - assertThat(underlying.inputElems, - containsInAnyOrder(WindowedValue.of(2, new Instant(-2), littleWindow, PaneInfo.NO_FIRING), - WindowedValue.of(2, new Instant(-2), bigWindow, PaneInfo.NO_FIRING))); + assertThat( + underlying.inputElems, + containsInAnyOrder( + WindowedValue.of( + 2, + new Instant(-2), + ImmutableList.of(littleWindow, bigWindow), + PaneInfo.NO_FIRING))); } @Test @@ -188,7 +192,7 @@ public class PushbackSideInputDoFnRunnerTest { runner.processElementInReadyWindows(multiWindow); assertThat(multiWindowPushback, emptyIterable()); assertThat(underlying.inputElems, - containsInAnyOrder(ImmutableList.copyOf(multiWindow.explodeWindows()).toArray())); + containsInAnyOrder(ImmutableList.of(multiWindow).toArray())); } @Test