Use WinodwMappingFn where possible Migrates callers away from the user of WindowingStrategyInternal, permitting future changes to have a configurable WindowMappingFn.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/9e8cf0c5 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/9e8cf0c5 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/9e8cf0c5 Branch: refs/heads/master Commit: 9e8cf0c5ea7f47a9d7ec05272d56508962c86918 Parents: cc5f78d Author: Thomas Groh <tg...@google.com> Authored: Wed Apr 5 11:53:03 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Wed Apr 5 18:08:03 2017 -0700 ---------------------------------------------------------------------- ...tputAndTimeBoundedSplittableProcessElementInvoker.java | 3 +-- .../beam/runners/core/PushbackSideInputDoFnRunner.java | 2 +- .../apache/beam/runners/core/ReduceFnContextFactory.java | 3 +-- .../org/apache/beam/runners/core/SimpleDoFnRunner.java | 2 +- .../org/apache/beam/runners/core/SimpleOldDoFnRunner.java | 2 +- .../org/apache/beam/runners/core/ReduceFnRunnerTest.java | 10 ++++------ .../java/org/apache/beam/sdk/transforms/DoFnTester.java | 3 +-- .../org/apache/beam/sdk/util/CombineContextFactory.java | 2 +- 8 files changed, 11 insertions(+), 16 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java index 5aa7605..357094c 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OutputAndTimeBoundedSplittableProcessElementInvoker.java @@ -226,8 +226,7 @@ public class OutputAndTimeBoundedSplittableProcessElementInvoker< public <T> T sideInput(PCollectionView<T> view) { return sideInputReader.get( view, - view.getWindowingStrategyInternal() - .getWindowFn() + view.getWindowMappingFn() .getSideInputWindow(Iterables.getOnlyElement(element.getWindows()))); } http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java index 2962832..4ad20b5 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/PushbackSideInputDoFnRunner.java @@ -98,7 +98,7 @@ public class PushbackSideInputDoFnRunner<InputT, OutputT> implements DoFnRunner< } for (PCollectionView<?> view : views) { BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + view.getWindowMappingFn().getSideInputWindow(mainInputWindow); if (!sideInputReader.isReady(view, sideInputWindow)) { return false; } http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java index 66a6ef8..8493474 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ReduceFnContextFactory.java @@ -514,8 +514,7 @@ class ReduceFnContextFactory<K, InputT, OutputT, W extends BoundedWindow> { public <T> T sideInput(PCollectionView<T> view) { return sideInputReader.get( view, - view.getWindowingStrategyInternal() - .getWindowFn() + view.getWindowMappingFn() .getSideInputWindow(mainInputWindow)); } http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java index dfa9645..77286b2 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleDoFnRunner.java @@ -533,7 +533,7 @@ public class SimpleDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, Out } } return context.sideInput( - view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window)); + view, view.getWindowMappingFn().getSideInputWindow(window)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java index c21ed77..c88f1c9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SimpleOldDoFnRunner.java @@ -389,7 +389,7 @@ class SimpleOldDoFnRunner<InputT, OutputT> implements DoFnRunner<InputT, OutputT } } return context.sideInput( - view, view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(window)); + view, view.getWindowMappingFn().getSideInputWindow(window)); } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java index 1bd717f..0d4d992 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/ReduceFnRunnerTest.java @@ -65,6 +65,7 @@ import org.apache.beam.sdk.transforms.windowing.SlidingWindows; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.TimeDomain; import org.apache.beam.sdk.util.WindowedValue; @@ -360,8 +361,9 @@ public class ReduceFnRunnerTest { WindowingStrategy.of(FixedWindows.of(Duration.millis(2))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES); - WindowingStrategy<?, IntervalWindow> sideInputWindowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(4))); + WindowMappingFn<?> sideInputWindowMappingFn = + FixedWindows.of(Duration.millis(4)).getDefaultWindowMappingFn(); + when(mockView.getWindowMappingFn()).thenReturn((WindowMappingFn) sideInputWindowMappingFn); TestOptions options = PipelineOptionsFactory.as(TestOptions.class); options.setValue(expectedValue); @@ -384,10 +386,6 @@ public class ReduceFnRunnerTest { } }); - @SuppressWarnings({"rawtypes", "unchecked", "unused"}) - Object suppressWarningsVar = when(mockView.getWindowingStrategyInternal()) - .thenReturn((WindowingStrategy) sideInputWindowingStrategy); - SumAndVerifyContextFn combineFn = new SumAndVerifyContextFn(mockView, expectedValue); ReduceFnTester<Integer, Integer, IntervalWindow> tester = ReduceFnTester.combining( mainInputWindowingStrategy, mockTriggerStateMachine, combineFn.<String>asKeyedFn(), http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index 01c639a..01f0291 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -650,8 +650,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { Map<BoundedWindow, ?> viewValues = sideInputs.get(view); if (viewValues != null) { BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal() - .getWindowFn() + view.getWindowMappingFn() .getSideInputWindow(element.getWindow()); @SuppressWarnings("unchecked") T windowValue = (T) viewValues.get(sideInputWindow); http://git-wip-us.apache.org/repos/asf/beam/blob/9e8cf0c5/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java index a983057..31d1f64 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/CombineContextFactory.java @@ -83,7 +83,7 @@ public class CombineContextFactory { } BoundedWindow sideInputWindow = - view.getWindowingStrategyInternal().getWindowFn().getSideInputWindow(mainInputWindow); + view.getWindowMappingFn().getSideInputWindow(mainInputWindow); return sideInputReader.get(view, sideInputWindow); } };