Propagate Pane in WindowEvaluatorFactory

Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/29fc84b2
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/29fc84b2
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/29fc84b2

Branch: refs/heads/master
Commit: 29fc84b2acd64b0028f189ff5350ed91e33854ad
Parents: 4e69a79
Author: Thomas Groh <tg...@google.com>
Authored: Tue Feb 14 16:19:54 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Tue Feb 14 17:31:34 2017 -0800

----------------------------------------------------------------------
 .../runners/direct/WindowEvaluatorFactory.java  |  3 +-
 .../direct/WindowEvaluatorFactoryTest.java      | 41 ++++++++++----------
 2 files changed, 22 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/29fc84b2/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
index 4ca556b..3cf178c 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/WindowEvaluatorFactory.java
@@ -25,7 +25,6 @@ import 
org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
 import org.apache.beam.sdk.transforms.PTransform;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
-import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.Bound;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
@@ -91,7 +90,7 @@ class WindowEvaluatorFactory implements 
TransformEvaluatorFactory {
         Collection<? extends BoundedWindow> windows = assignWindows(windowFn, 
element);
         outputBundle.add(
             WindowedValue.<InputT>of(
-                element.getValue(), element.getTimestamp(), windows, 
PaneInfo.NO_FIRING));
+                element.getValue(), element.getTimestamp(), windows, 
element.getPane()));
       }
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/29fc84b2/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
index aa841ed..7e6eb2f 100644
--- 
a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
+++ 
b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/WindowEvaluatorFactoryTest.java
@@ -19,6 +19,7 @@ package org.apache.beam.runners.direct;
 
 import static 
org.apache.beam.runners.core.WindowMatchers.isSingleWindowedValue;
 import static org.apache.beam.runners.core.WindowMatchers.isWindowedValue;
+import static org.apache.beam.sdk.transforms.windowing.PaneInfo.NO_FIRING;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Mockito.when;
@@ -43,6 +44,7 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
 import org.apache.beam.sdk.transforms.windowing.IntervalWindow;
 import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn;
 import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing;
 import org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Window;
 import org.apache.beam.sdk.transforms.windowing.Window.Bound;
@@ -77,12 +79,12 @@ public class WindowEvaluatorFactoryTest {
   private WindowedValue<Long> valueInGlobalWindow =
       WindowedValue.timestampedValueInGlobalWindow(3L, new Instant(2L));
 
+  private final PaneInfo intervalWindowPane = PaneInfo.createPane(false, 
false, Timing.LATE, 3, 2);
   private WindowedValue<Long> valueInIntervalWindow =
       WindowedValue.of(
           Long.valueOf(2L),
           new Instant(-10L),
-          new IntervalWindow(new Instant(-100), EPOCH),
-          PaneInfo.NO_FIRING);
+          new IntervalWindow(new Instant(-100), EPOCH), intervalWindowPane);
 
   private IntervalWindow intervalWindow1 =
       new IntervalWindow(EPOCH, BoundedWindow.TIMESTAMP_MAX_VALUE);
@@ -91,12 +93,13 @@ public class WindowEvaluatorFactoryTest {
       new IntervalWindow(
           EPOCH.plus(Duration.standardDays(3)), 
EPOCH.plus(Duration.standardDays(6)));
 
+  private final PaneInfo multiWindowPane = PaneInfo.createPane(false, true, 
Timing.ON_TIME, 3, 0);
   private WindowedValue<Long> valueInGlobalAndTwoIntervalWindows =
       WindowedValue.of(
           Long.valueOf(1L),
           EPOCH.plus(Duration.standardDays(3)),
           ImmutableList.of(GlobalWindow.INSTANCE, intervalWindow1, 
intervalWindow2),
-          PaneInfo.NO_FIRING);
+          multiWindowPane);
 
   @Rule
   public TestPipeline p = 
TestPipeline.create().enableAbandonedNodeEnforcement(false);
@@ -158,18 +161,18 @@ public class WindowEvaluatorFactoryTest {
         committed.getElements(),
         containsInAnyOrder(
             // value in global window
-            isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, 
PaneInfo.NO_FIRING),
+            isSingleWindowedValue(3L, new Instant(2L), firstSecondWindow, 
NO_FIRING),
 
             // value in just interval window
-            isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, 
PaneInfo.NO_FIRING),
+            isSingleWindowedValue(2L, new Instant(-10L), thirdWindow, 
intervalWindowPane),
 
             // value in global window and two interval windows
             isSingleWindowedValue(
-                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
PaneInfo.NO_FIRING),
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
multiWindowPane),
             isSingleWindowedValue(
-                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
PaneInfo.NO_FIRING),
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
multiWindowPane),
             isSingleWindowedValue(
-                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
PaneInfo.NO_FIRING)));
+                1L, EPOCH.plus(Duration.standardDays(3)), firstSecondWindow, 
multiWindowPane)));
   }
 
   @Test
@@ -204,31 +207,31 @@ public class WindowEvaluatorFactoryTest {
                 valueInGlobalWindow.getValue(),
                 valueInGlobalWindow.getTimestamp(),
                 ImmutableSet.of(w1, wMinusSlide),
-                PaneInfo.NO_FIRING),
+                NO_FIRING),
 
             // Value in interval window mapped to one windowed value in 
multiple windows
             isWindowedValue(
                 valueInIntervalWindow.getValue(),
                 valueInIntervalWindow.getTimestamp(),
                 ImmutableSet.of(wMinus1, wMinusSlide),
-                PaneInfo.NO_FIRING),
+                valueInIntervalWindow.getPane()),
 
             // Value in three windows mapped to three windowed values in the 
same multiple windows
             isWindowedValue(
                 valueInGlobalAndTwoIntervalWindows.getValue(),
                 valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                 ImmutableSet.of(w1, w2),
-                PaneInfo.NO_FIRING),
+                valueInGlobalAndTwoIntervalWindows.getPane()),
             isWindowedValue(
                 valueInGlobalAndTwoIntervalWindows.getValue(),
                 valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                 ImmutableSet.of(w1, w2),
-                PaneInfo.NO_FIRING),
+                valueInGlobalAndTwoIntervalWindows.getPane()),
             isWindowedValue(
                 valueInGlobalAndTwoIntervalWindows.getValue(),
                 valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                 ImmutableSet.of(w1, w2),
-                PaneInfo.NO_FIRING)));
+                valueInGlobalAndTwoIntervalWindows.getPane())));
   }
 
   @Test
@@ -256,14 +259,14 @@ public class WindowEvaluatorFactoryTest {
                 new IntervalWindow(
                     valueInGlobalWindow.getTimestamp(),
                     valueInGlobalWindow.getTimestamp().plus(1L)),
-                PaneInfo.NO_FIRING),
+                valueInGlobalWindow.getPane()),
 
             // Value in interval window mapped to the same window
             isWindowedValue(
                 valueInIntervalWindow.getValue(),
                 valueInIntervalWindow.getTimestamp(),
                 valueInIntervalWindow.getWindows(),
-                PaneInfo.NO_FIRING),
+                valueInIntervalWindow.getPane()),
 
             // Value in global window and two interval windows exploded and 
mapped in both ways
             isSingleWindowedValue(
@@ -272,19 +275,17 @@ public class WindowEvaluatorFactoryTest {
                 new IntervalWindow(
                     valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                     
valueInGlobalAndTwoIntervalWindows.getTimestamp().plus(1L)),
-                PaneInfo.NO_FIRING),
-
+                valueInGlobalAndTwoIntervalWindows.getPane()),
             isSingleWindowedValue(
                 valueInGlobalAndTwoIntervalWindows.getValue(),
                 valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                 intervalWindow1,
-                PaneInfo.NO_FIRING),
-
+                valueInGlobalAndTwoIntervalWindows.getPane()),
             isSingleWindowedValue(
                 valueInGlobalAndTwoIntervalWindows.getValue(),
                 valueInGlobalAndTwoIntervalWindows.getTimestamp(),
                 intervalWindow2,
-                PaneInfo.NO_FIRING)));
+                valueInGlobalAndTwoIntervalWindows.getPane())));
   }
 
   private CommittedBundle<Long> createInputBundle() {

Reply via email to