Remove getSideInputWindow Callers should instead get the Default WindowMappingFn if no explicit WindowMappingFn is available.
Migrate all existing callers within the SDK and runners. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/79b066da Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/79b066da Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/79b066da Branch: refs/heads/jstorm-runner Commit: 79b066da4ed26fae63035fb16c03508ea77bf6db Parents: 075b621 Author: Thomas Groh <[email protected]> Authored: Tue Apr 4 10:38:36 2017 -0700 Committer: Thomas Groh <[email protected]> Committed: Mon Apr 17 13:09:39 2017 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/runners/core/OldDoFn.java | 3 ++- .../beam/runners/spark/util/SparkSideInputReader.java | 3 +-- .../apache/beam/sdk/transforms/windowing/WindowFn.java | 13 ------------- .../org/apache/beam/sdk/testing/StaticWindowsTest.java | 10 +++++++--- 4 files changed, 10 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java index 507ee50..323edf9 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/OldDoFn.java @@ -42,6 +42,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; @@ -241,7 +242,7 @@ public abstract class OldDoFn<InputT, OutputT> implements Serializable, HasDispl * window of the main input element. * * <p>See - * {@link org.apache.beam.sdk.transforms.windowing.WindowFn#getSideInputWindow} + * {@link WindowMappingFn#getSideInputWindow} * for how this corresponding window is determined. * * @throws IllegalArgumentException if this is not a side input http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java index c8e9850..d6e1a94 100644 --- a/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/util/SparkSideInputReader.java @@ -54,9 +54,8 @@ public class SparkSideInputReader implements SideInputReader { checkNotNull(windowedBroadcastHelper, "SideInput for view " + view + " is not available."); //--- sideInput window - WindowingStrategy<?, ?> sideInputWindowStrategy = windowedBroadcastHelper.getKey(); final BoundedWindow sideInputWindow = - sideInputWindowStrategy.getWindowFn().getSideInputWindow(window); + view.getWindowMappingFn().getSideInputWindow(window); //--- match the appropriate sideInput window. // a tag will point to all matching sideInputs, that is all windows. http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 2f9e6c1..5ebbb41 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -124,19 +124,6 @@ public abstract class WindowFn<T, W extends BoundedWindow> public abstract Coder<W> windowCoder(); /** - * Returns the window of the side input corresponding to the given window of - * the main input. If not overridden, will use the window returned by calling - * {@link WindowMappingFn#getSideInputWindow(BoundedWindow)} on the result of - * {@link #getDefaultWindowMappingFn()}. - * - * @deprecated see {@link #getDefaultWindowMappingFn()} - */ - @Deprecated - public W getSideInputWindow(BoundedWindow window) { - return getDefaultWindowMappingFn().getSideInputWindow(window); - } - - /** * Returns the default {@link WindowMappingFn} to use to map main input windows to side input * windows. This should accept arbitrary main input windows, and produce a {@link BoundedWindow} * that can be produced by this {@link WindowFn}. http://git-wip-us.apache.org/repos/asf/beam/blob/79b066da/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java index e662619..7ee48c8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/testing/StaticWindowsTest.java @@ -70,8 +70,12 @@ public class StaticWindowsTest { WindowFn<Object, BoundedWindow> fn = StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(first, second)); - assertThat(fn.getSideInputWindow(first), Matchers.<BoundedWindow>equalTo(first)); - assertThat(fn.getSideInputWindow(second), Matchers.<BoundedWindow>equalTo(second)); + assertThat( + fn.getDefaultWindowMappingFn().getSideInputWindow(first), + Matchers.<BoundedWindow>equalTo(first)); + assertThat( + fn.getDefaultWindowMappingFn().getSideInputWindow(second), + Matchers.<BoundedWindow>equalTo(second)); } @Test @@ -80,7 +84,7 @@ public class StaticWindowsTest { StaticWindows.of(IntervalWindow.getCoder(), ImmutableList.of(second)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("contains"); - fn.getSideInputWindow(first); + fn.getDefaultWindowMappingFn().getSideInputWindow(first); } @Test
