Propagate Pane in WindowEvaluatorFactory
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29fc84b2 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29fc84b2 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29fc84b2 Branch: refs/heads/master Commit: 29fc84b2acd64b0028f189ff5350ed91e33854ad Parents: 4e69a79 Author: Thomas Groh <tg...@google.com> Authored: Tue Feb 14 16:19:54 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Tue Feb 14 17:31:34 2017 -0800 ---------------------------------------------------------------------- .../runners/direct/WindowEvaluatorFactory.java | 3 +- .../direct/WindowEvaluatorFactoryTest.java | 41 ++++++++++---------- 2 files changed, 22 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/29fc84b2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java index 4ca556b..3cf178c 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java @@ -25,7 +25,6 @@ import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.Bound; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -91,7 +90,7 @@ class WindowEvaluatorFactory implements TransformEvaluatorFactory { Collection<? extends BoundedWindow> windows = assignWindows(windowFn, element); outputBundle.add( WindowedValue.<InputT>of( - element.getValue(), element.getTimestamp(), windows, PaneInfo.NO_FIRING)); + element.getValue(), element.getTimestamp(), windows, element.getPane())); } } http://git-wip-us.apache.org/repos/asf/beam/blob/29fc84b2/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java index aa841ed..7e6eb2f 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java @@ -19,6 +19,7 @@ package org.apache.beam.runners.direct; import static org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue; import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue; +import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.junit.Assert.assertThat; import static org.mockito.Mockito.when; @@ -43,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.Bound; @@ -77,12 +79,12 @@ public class WindowEvaluatorFactoryTest { private WindowedValue<Long> valueInGlobalWindow = WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L)); + private final PaneInfo intervalWindowPane = PaneInfo.createPane(false, false, Timing.LATE, 3, 2); private WindowedValue<Long> valueInIntervalWindow = WindowedValue.of( Long.valueOf(2L), new Instant(-10L), - new IntervalWindow(new Instant(-100), EPOCH), - PaneInfo.NO_FIRING); + new IntervalWindow(new Instant(-100), EPOCH), intervalWindowPane); private IntervalWindow intervalWindow1 = new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE); @@ -91,12 +93,13 @@ public class WindowEvaluatorFactoryTest { new IntervalWindow( EPOCH.plus(Duration.standardDays(3)), EPOCH.plus(Duration.standardDays(6))); + private final PaneInfo multiWindowPane = PaneInfo.createPane(false, true, Timing.ON_TIME, 3, 0); private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows = WindowedValue.of( Long.valueOf(1L), EPOCH.plus(Duration.standardDays(3)), ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, intervalWindow2), - PaneInfo.NO_FIRING); + multiWindowPane); @Rule public TestPipeline p = TestPipeline.create().enableAbandonedNodeEnforcement(false); @@ -158,18 +161,18 @@ public class WindowEvaluatorFactoryTest { committed.getElements(), containsInAnyOrder( // value in global window - isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, PaneInfo.NO_FIRING), + isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, NO_FIRING), // value in just interval window - isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, PaneInfo.NO_FIRING), + isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, intervalWindowPane), // value in global window and two interval windows isSingleWindowedValue( - 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING), + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, multiWindowPane), isSingleWindowedValue( - 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING), + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, multiWindowPane), isSingleWindowedValue( - 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, PaneInfo.NO_FIRING))); + 1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, multiWindowPane))); } @Test @@ -204,31 +207,31 @@ public class WindowEvaluatorFactoryTest { valueInGlobalWindow.getValue(), valueInGlobalWindow.getTimestamp(), ImmutableSet.of(w1, wMinusSlide), - PaneInfo.NO_FIRING), + NO_FIRING), // Value in interval window mapped to one windowed value in multiple windows isWindowedValue( valueInIntervalWindow.getValue(), valueInIntervalWindow.getTimestamp(), ImmutableSet.of(wMinus1, wMinusSlide), - PaneInfo.NO_FIRING), + valueInIntervalWindow.getPane()), // Value in three windows mapped to three windowed values in the same multiple windows isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - PaneInfo.NO_FIRING), + valueInGlobalAndTwoIntervalWindows.getPane()), isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - PaneInfo.NO_FIRING), + valueInGlobalAndTwoIntervalWindows.getPane()), isWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), ImmutableSet.of(w1, w2), - PaneInfo.NO_FIRING))); + valueInGlobalAndTwoIntervalWindows.getPane()))); } @Test @@ -256,14 +259,14 @@ public class WindowEvaluatorFactoryTest { new IntervalWindow( valueInGlobalWindow.getTimestamp(), valueInGlobalWindow.getTimestamp().plus(1L)), - PaneInfo.NO_FIRING), + valueInGlobalWindow.getPane()), // Value in interval window mapped to the same window isWindowedValue( valueInIntervalWindow.getValue(), valueInIntervalWindow.getTimestamp(), valueInIntervalWindow.getWindows(), - PaneInfo.NO_FIRING), + valueInIntervalWindow.getPane()), // Value in global window and two interval windows exploded and mapped in both ways isSingleWindowedValue( @@ -272,19 +275,17 @@ public class WindowEvaluatorFactoryTest { new IntervalWindow( valueInGlobalAndTwoIntervalWindows.getTimestamp(), valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)), - PaneInfo.NO_FIRING), - + valueInGlobalAndTwoIntervalWindows.getPane()), isSingleWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), intervalWindow1, - PaneInfo.NO_FIRING), - + valueInGlobalAndTwoIntervalWindows.getPane()), isSingleWindowedValue( valueInGlobalAndTwoIntervalWindows.getValue(), valueInGlobalAndTwoIntervalWindows.getTimestamp(), intervalWindow2, - PaneInfo.NO_FIRING))); + valueInGlobalAndTwoIntervalWindows.getPane()))); } private CommittedBundle<Long> createInputBundle() {