Make WindowMappingFn#maximumLookback Configurable but Final This enforces that it return a constant value.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/61bb6b4e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/61bb6b4e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/61bb6b4e Branch: refs/heads/master Commit: 61bb6b4e301a3675f20728730a2d691a79941156 Parents: 88ffc97 Author: Thomas Groh <tg...@google.com> Authored: Thu Mar 23 15:20:06 2017 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Mon Mar 27 12:55:30 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/sdk/testing/StaticWindows.java | 8 +------ .../sdk/transforms/windowing/GlobalWindows.java | 6 ----- .../windowing/PartitioningWindowFn.java | 6 ----- .../transforms/windowing/SlidingWindows.java | 5 ---- .../transforms/windowing/WindowMappingFn.java | 24 +++++++++++++++++--- .../sdk/util/IdentitySideInputWindowFn.java | 6 ----- 6 files changed, 22 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java index 4be88c8..fde1669 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/StaticWindows.java @@ -103,7 +103,7 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> { @Override public WindowMappingFn<BoundedWindow> getDefaultWindowMappingFn() { - return new WindowMappingFn<BoundedWindow>() { + return new WindowMappingFn<BoundedWindow>(Duration.millis(Long.MAX_VALUE)) { @Override public BoundedWindow getSideInputWindow(BoundedWindow mainWindow) { checkArgument( @@ -112,12 +112,6 @@ final class StaticWindows extends NonMergingWindowFn<Object, BoundedWindow> { StaticWindows.class.getSimpleName()); return mainWindow; } - - @Override - public Duration maximumLookback() { - // TODO: This may be unsafe. - return Duration.millis(Long.MAX_VALUE); - } }; } } http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java index e91fad1..400be1f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/GlobalWindows.java @@ -20,7 +20,6 @@ package org.apache.beam.sdk.transforms.windowing; import java.util.Collection; import java.util.Collections; import org.apache.beam.sdk.coders.Coder; -import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -56,11 +55,6 @@ public class GlobalWindows extends NonMergingWindowFn<Object, GlobalWindow> { public GlobalWindow getSideInputWindow(BoundedWindow mainWindow) { return GlobalWindow.INSTANCE; } - - @Override - public Duration maximumLookback() { - return Duration.ZERO; - } } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java index 40cff8a..40ee68a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/PartitioningWindowFn.java @@ -19,7 +19,6 @@ package org.apache.beam.sdk.transforms.windowing; import java.util.Arrays; import java.util.Collection; -import org.joda.time.Duration; import org.joda.time.Instant; /** @@ -52,11 +51,6 @@ public abstract class PartitioningWindowFn<T, W extends BoundedWindow> } return assignWindow(mainWindow.maxTimestamp()); } - - @Override - public Duration maximumLookback() { - return Duration.ZERO; - } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index b27f4e6..650dc37 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -139,11 +139,6 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> { long lastStart = lastStartFor(mainWindow.maxTimestamp().minus(size)); return new IntervalWindow(new Instant(lastStart + period.getMillis()), size); } - - @Override - public Duration maximumLookback() { - return Duration.ZERO; - } }; } http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java index 62bf544..910ed98 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowMappingFn.java @@ -29,14 +29,30 @@ import org.joda.time.Duration; * {@link BoundMulti#withSideInputs(PCollectionView[]) side input}. */ public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> implements Serializable { + private final Duration maximumLookback; + + /** + * Create a new {@link WindowMappingFn} with {@link Duration#ZERO zero} maximum lookback. + */ + protected WindowMappingFn() { + this(Duration.ZERO); + } + + /** + * Create a new {@link WindowMappingFn} with the specified maximum lookback. + */ + protected WindowMappingFn(Duration maximumLookback) { + this.maximumLookback = maximumLookback; + } + /** * Returns the window of the side input corresponding to the given window of the main input. */ public abstract TargetWindowT getSideInputWindow(BoundedWindow mainWindow); /** - * The maximum distance between the end of any main input window {@code mainWindow} - * and the end of the side input window returned by {@link #getSideInputWindow(BoundedWindow)} + * The maximum distance between the end of any main input window {@code mainWindow} and the end of + * the side input window returned by {@link #getSideInputWindow(BoundedWindow)} * * <p>A side input window {@code w} becomes unreachable when the input watermarks for all * consumers surpasses the timestamp: @@ -45,5 +61,7 @@ public abstract class WindowMappingFn<TargetWindowT extends BoundedWindow> imple * * <p>At this point, every main input window that could map to {@code w} is expired. */ - public abstract Duration maximumLookback(); + public final Duration maximumLookback() { + return maximumLookback; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/61bb6b4e/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java index 60d9afe..2171466 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/IdentitySideInputWindowFn.java @@ -25,7 +25,6 @@ import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.NonMergingWindowFn; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.transforms.windowing.WindowMappingFn; -import org.joda.time.Duration; /** * A {@link WindowFn} for use during tests that returns the input window for calls to @@ -56,11 +55,6 @@ public class IdentitySideInputWindowFn extends NonMergingWindowFn<Integer, Bound public BoundedWindow getSideInputWindow(BoundedWindow window) { return window; } - - @Override - public Duration maximumLookback() { - return Duration.ZERO; - } }; } }