Use WinodwMappingFn where possible

Migrates callers away from the user of WindowingStrategyInternal,
permitting future changes to have a configurable WindowMappingFn.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e8cf0c5
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e8cf0c5
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e8cf0c5

Branch: refs/heads/master
Commit: 9e8cf0c5ea7f47a9d7ec05272d56508962c86918
Parents: cc5f78d
Author: Thomas Groh <tg...@google.com>
Authored: Wed Apr 5 11:53:03 2017 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Wed Apr 5 18:08:03 2017 -0700

----------------------------------------------------------------------
 ...tputAndTimeBoundedSplittableProcessElementInvoker.java |  3 +--
 .../beam/runners/core/PushbackSideInputDoFnRunner.java    |  2 +-
 .../apache/beam/runners/core/ReduceFnContextFactory.java  |  3 +--
 .../org/apache/beam/runners/core/SimpleDoFnRunner.java    |  2 +-
 .../org/apache/beam/runners/core/SimpleOldDoFnRunner.java |  2 +-
 .../org/apache/beam/runners/core/ReduceFnRunnerTest.java  | 10 ++++------
 .../java/org/apache/beam/sdk/transforms/DoFnTester.java   |  3 +--
 .../org/apache/beam/sdk/util/CombineContextFactory.java   |  2 +-
 8 files changed, 11 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
index 5aa7605..357094c 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java
@@ -226,8 +226,7 @@ public class 
OutputAndTimeBoundedSplittableProcessElementInvoker<
     public <T> T sideInput(PCollectionView<T> view) {
       return sideInputReader.get(
           view,
-          view.getWindowingStrategyInternal()
-              .getWindowFn()
+          view.getWindowMappingFn()
               
.getSideInputWindow(Iterables.getOnlyElement(element.getWindows())));
     }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
index 2962832..4ad20b5 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java
@@ -98,7 +98,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> 
implements DoFnRunner<
     }
     for (PCollectionView<?> view : views) {
       BoundedWindow sideInputWindow =
-          
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+          view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
       if (!sideInputReader.isReady(view, sideInputWindow)) {
         return false;
       }

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
index 66a6ef8..8493474 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java
@@ -514,8 +514,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends 
BoundedWindow> {
         public <T> T sideInput(PCollectionView<T> view) {
           return sideInputReader.get(
               view,
-              view.getWindowingStrategyInternal()
-                  .getWindowFn()
+              view.getWindowMappingFn()
                   .getSideInputWindow(mainInputWindow));
         }
 

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
index dfa9645..77286b2 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java
@@ -533,7 +533,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, Out
         }
       }
       return context.sideInput(
-          view, 
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
+          view, view.getWindowMappingFn().getSideInputWindow(window));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
index c21ed77..c88f1c9 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java
@@ -389,7 +389,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements 
DoFnRunner<InputT, OutputT
         }
       }
       return context.sideInput(
-          view, 
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window));
+          view, view.getWindowMappingFn().getSideInputWindow(window));
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
index 1bd717f..0d4d992 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java
@@ -65,6 +65,7 @@ import 
org.apache.beam.sdk.transforms.windowing.SlidingWindows;
 import org.apache.beam.sdk.transforms.windowing.Trigger;
 import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior;
 import org.apache.beam.sdk.transforms.windowing.WindowFn;
+import org.apache.beam.sdk.transforms.windowing.WindowMappingFn;
 import org.apache.beam.sdk.util.SideInputReader;
 import org.apache.beam.sdk.util.TimeDomain;
 import org.apache.beam.sdk.util.WindowedValue;
@@ -360,8 +361,9 @@ public class ReduceFnRunnerTest {
         WindowingStrategy.of(FixedWindows.of(Duration.millis(2)))
             .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES);
 
-    WindowingStrategy<?, IntervalWindow> sideInputWindowingStrategy =
-        WindowingStrategy.of(FixedWindows.of(Duration.millis(4)));
+    WindowMappingFn<?> sideInputWindowMappingFn =
+        FixedWindows.of(Duration.millis(4)).getDefaultWindowMappingFn();
+    when(mockView.getWindowMappingFn()).thenReturn((WindowMappingFn) 
sideInputWindowMappingFn);
 
     TestOptions options = PipelineOptionsFactory.as(TestOptions.class);
     options.setValue(expectedValue);
@@ -384,10 +386,6 @@ public class ReduceFnRunnerTest {
               }
             });
 
-    @SuppressWarnings({"rawtypes", "unchecked", "unused"})
-    Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal())
-        .thenReturn((WindowingStrategy) sideInputWindowingStrategy);
-
     SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, 
expectedValue);
     ReduceFnTester<Integer, Integer, IntervalWindow> tester = 
ReduceFnTester.combining(
         mainInputWindowingStrategy, mockTriggerStateMachine, 
combineFn.<String>asKeyedFn(),

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
index 01c639a..01f0291 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java
@@ -650,8 +650,7 @@ public class DoFnTester<InputT, OutputT> implements 
AutoCloseable {
       Map<BoundedWindow, ?> viewValues = sideInputs.get(view);
       if (viewValues != null) {
         BoundedWindow sideInputWindow =
-            view.getWindowingStrategyInternal()
-                .getWindowFn()
+            view.getWindowMappingFn()
                 .getSideInputWindow(element.getWindow());
         @SuppressWarnings("unchecked")
         T windowValue = (T) viewValues.get(sideInputWindow);

http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
index a983057..31d1f64 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java
@@ -83,7 +83,7 @@ public class CombineContextFactory {
         }
 
         BoundedWindow sideInputWindow =
-            
view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow);
+            view.getWindowMappingFn().getSideInputWindow(mainInputWindow);
         return sideInputReader.get(view, sideInputWindow);
       }
     };

Reply via email to