Repository: incubator-beam Updated Branches: refs/heads/master 199ec2e10 -> 7725a47a7
Make WindowingStrategy combine WindowFn with OutputTimeFn Previously: - Any user-specified OutputTimeFn overrode the WindowFn#getOutputTime - WindowFn#getOutputTimeFn provided a default OutputTimeFn - The default varied from "earliest" to "end of window" Now: - The user-specified OutputTimeFn is used to combine the WindowFn's assigned output timestamps. - The WindowFn does not provide the default. - The default is always to output at end of window. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3755c557 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3755c557 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3755c557 Branch: refs/heads/master Commit: 3755c5579de3c46202a32b3ac2774ab440cf42f3 Parents: e63311f Author: Kenneth Knowles <k...@google.com> Authored: Thu May 5 19:33:16 2016 -0700 Committer: Kenneth Knowles <k...@google.com> Committed: Mon May 9 10:10:57 2016 -0700 ---------------------------------------------------------------------- .../FlinkGroupAlsoByWindowWrapper.java | 2 +- .../flink/streaming/GroupAlsoByWindowTest.java | 7 +- .../beam/sdk/testing/WindowFnTestUtils.java | 4 +- .../sdk/transforms/windowing/OutputTimeFn.java | 1 - .../beam/sdk/transforms/windowing/Sessions.java | 8 -- .../transforms/windowing/SlidingWindows.java | 23 ++---- .../beam/sdk/transforms/windowing/WindowFn.java | 82 ++++---------------- .../apache/beam/sdk/util/WindowingStrategy.java | 55 ++++++++++++- .../sdk/transforms/join/CoGroupByKeyTest.java | 7 +- .../sdk/transforms/windowing/WindowTest.java | 31 +++++--- .../sdk/transforms/windowing/WindowingTest.java | 9 ++- .../sdk/util/GroupAlsoByWindowsProperties.java | 65 +++------------- ...oupAlsoByWindowsViaOutputBufferDoFnTest.java | 8 +- .../beam/sdk/util/ReduceFnRunnerTest.java | 6 ++ .../apache/beam/sdk/util/ReduceFnTester.java | 9 ++- .../CopyOnAccessInMemoryStateInternalsTest.java | 6 +- 16 files changed, 136 insertions(+), 187 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java index 0306aa1..9d2cad8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/FlinkGroupAlsoByWindowWrapper.java @@ -412,7 +412,7 @@ public class FlinkGroupAlsoByWindowWrapper<K, VIN, VACC, VOUT> FlinkStateInternals<K> stateInternals = perKeyStateInternals.get(key); if (stateInternals == null) { Coder<? extends BoundedWindow> windowCoder = this.windowingStrategy.getWindowFn().windowCoder(); - OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getWindowFn().getOutputTimeFn(); + OutputTimeFn<? super BoundedWindow> outputTimeFn = this.windowingStrategy.getOutputTimeFn(); stateInternals = new FlinkStateInternals<>(key, inputKvCoder.getKeyCoder(), windowCoder, outputTimeFn); perKeyStateInternals.put(key, stateInternals); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java index f3ceba7..c76af65 100644 --- a/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java +++ b/runners/flink/runner/src/test/java/org/apache/beam/runners/flink/streaming/GroupAlsoByWindowTest.java @@ -30,6 +30,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Repeatedly; import org.apache.beam.sdk.transforms.windowing.Sessions; @@ -58,16 +59,19 @@ public class GroupAlsoByWindowTest { private final WindowingStrategy slidingWindowWithAfterWatermarkTriggerStrategy = WindowingStrategy.of(SlidingWindows.of(Duration.standardSeconds(10)).every(Duration.standardSeconds(5))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withTrigger(AfterWatermark.pastEndOfWindow()).withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES); private final WindowingStrategy sessionWindowingStrategy = WindowingStrategy.of(Sessions.withGapDuration(Duration.standardSeconds(2))) .withTrigger(Repeatedly.forever(AfterWatermark.pastEndOfWindow())) .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withAllowedLateness(Duration.standardSeconds(100)); private final WindowingStrategy fixedWindowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))); + WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); private final WindowingStrategy fixedWindowWithCountTriggerStrategy = fixedWindowingStrategy.withTrigger(AfterPane.elementCountAtLeast(5)); @@ -94,6 +98,7 @@ public class GroupAlsoByWindowTest { public void testWithLateness() throws Exception { WindowingStrategy strategy = WindowingStrategy.of(FixedWindows.of(Duration.standardSeconds(2))) .withMode(WindowingStrategy.AccumulationMode.ACCUMULATING_FIRED_PANES) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withAllowedLateness(Duration.millis(1000)); long initialTime = 0L; Pipeline pipeline = FlinkTestPipeline.createForStreaming(); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java index 2566a12..a4130df 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/WindowFnTestUtils.java @@ -201,7 +201,7 @@ public class WindowFnTestUtils { Instant instant = new Instant(timestamp); for (W window : windows) { - Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window); + Instant outputTimestamp = windowFn.getOutputTime(instant, window); assertFalse("getOutputTime must be greater than or equal to input timestamp", outputTimestamp.isBefore(instant)); assertFalse("getOutputTime must be less than or equal to the max timestamp", @@ -232,7 +232,7 @@ public class WindowFnTestUtils { Instant instant = new Instant(timestamp); Instant endOfPrevious = null; for (W window : sortedWindows) { - Instant outputTimestamp = windowFn.getOutputTimeFn().assignOutputTime(instant, window); + Instant outputTimestamp = windowFn.getOutputTime(instant, window); if (endOfPrevious == null) { // If this is the first window, the output timestamp can be anything, as long as it is in // the valid range. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java index 3deea56..7cf870a 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OutputTimeFn.java @@ -61,7 +61,6 @@ public abstract class OutputTimeFn<W extends BoundedWindow> implements Serializa * Returns the output timestamp to use for data depending on the given * {@code inputTimestamp} in the specified {@code window}. * - * * <p>The result of this method must be between {@code inputTimestamp} and * {@code window.maxTimestamp()} (inclusive on both sides). * http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java index 8e8a005..788566e 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java @@ -17,8 +17,6 @@ */ package org.apache.beam.sdk.transforms.windowing; -import org.apache.beam.sdk.annotations.Experimental; -import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; @@ -88,12 +86,6 @@ public class Sessions extends WindowFn<Object, IntervalWindow> { throw new UnsupportedOperationException("Sessions is not allowed in side inputs"); } - @Experimental(Kind.OUTPUT_TIME) - @Override - public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() { - return OutputTimeFns.outputAtEarliestInputTimestamp(); - } - public Duration getGapDuration() { return gapDuration; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index 4153e21..62c2738 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -185,26 +185,17 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> { /** * Ensures that later sliding windows have an output time that is past the end of earlier windows. * - * <p>If this is the earliest sliding window containing {@code inputTimestamp}, that's fine. + * <p> + * If this is the earliest sliding window containing {@code inputTimestamp}, that's fine. * Otherwise, we pick the earliest time that doesn't overlap with earlier windows. */ @Experimental(Kind.OUTPUT_TIME) @Override - public OutputTimeFn<? super IntervalWindow> getOutputTimeFn() { - return new OutputTimeFn.Defaults<BoundedWindow>() { - @Override - public Instant assignOutputTime(Instant inputTimestamp, BoundedWindow window) { - Instant startOfLastSegment = window.maxTimestamp().minus(period); - return startOfLastSegment.isBefore(inputTimestamp) - ? inputTimestamp - : startOfLastSegment.plus(1); - } - - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - }; + public Instant getOutputTime(Instant inputTimestamp, IntervalWindow window) { + Instant startOfLastSegment = window.maxTimestamp().minus(period); + return startOfLastSegment.isBefore(inputTimestamp) + ? inputTimestamp + : startOfLastSegment.plus(1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index e291bee..41833f8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -22,9 +22,6 @@ import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.transforms.display.HasDisplayData; -import org.apache.beam.sdk.util.WindowingStrategy; - -import com.google.common.collect.Ordering; import org.joda.time.Instant; @@ -136,34 +133,24 @@ public abstract class WindowFn<T, W extends BoundedWindow> public abstract W getSideInputWindow(final BoundedWindow window); /** - * @deprecated Implement {@link #getOutputTimeFn} to return one of the appropriate - * {@link OutputTimeFns}, or a custom {@link OutputTimeFn} extending - * {@link OutputTimeFn.Defaults}. - */ - @Deprecated - @Experimental(Kind.OUTPUT_TIME) - public Instant getOutputTime(Instant inputTimestamp, W window) { - return getOutputTimeFn().assignOutputTime(inputTimestamp, window); - } - - /** - * Provides a default implementation for {@link WindowingStrategy#getOutputTimeFn()}. - * See the full specification there. + * Returns the output timestamp to use for data depending on the given + * {@code inputTimestamp} in the specified {@code window}. * - * <p>If this {@link WindowFn} doesn't produce overlapping windows, this need not (and probably - * should not) override any of the default implementations in {@link OutputTimeFn.Defaults}. + * <p>The result of this method must be between {@code inputTimestamp} and + * {@code window.maxTimestamp()} (inclusive on both sides). * - * <p>If this {@link WindowFn} does produce overlapping windows that can be predicted here, it is - * suggested that the result in later overlapping windows is past the end of earlier windows so - * that the later windows don't prevent the watermark from progressing past the end of the earlier - * window. + * <p>This function must be monotonic across input timestamps. Specifically, if {@code A < B}, + * then {@code getOutputTime(A, window) <= getOutputTime(B, window)}. * - * <p>For example, a timestamp in a sliding window should be moved past the beginning of the next - * sliding window. See {@link SlidingWindows#getOutputTimeFn}. + * <p>For a {@link WindowFn} that doesn't produce overlapping windows, this can (and typically + * should) just return {@code inputTimestamp}. In the presence of overlapping windows, it is + * suggested that the result in later overlapping windows is past the end of earlier windows + * so that the later windows don't prevent the watermark from + * progressing past the end of the earlier window. */ @Experimental(Kind.OUTPUT_TIME) - public OutputTimeFn<? super W> getOutputTimeFn() { - return new OutputAtEarliestAssignedTimestamp<>(this); + public Instant getOutputTime(Instant inputTimestamp, W window) { + return inputTimestamp; } /** @@ -189,47 +176,4 @@ public abstract class WindowFn<T, W extends BoundedWindow> @Override public void populateDisplayData(DisplayData.Builder builder) { } - - /** - * A compatibility adapter that will return the assigned timestamps according to the - * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps - * on the {@link WindowFn} is now deprecated. - */ - private static class OutputAtEarliestAssignedTimestamp<W extends BoundedWindow> - extends OutputTimeFn.Defaults<W> { - - private final WindowFn<?, W> windowFn; - - public OutputAtEarliestAssignedTimestamp(WindowFn<?, W> windowFn) { - this.windowFn = windowFn; - } - - /** - * {@inheritDoc} - * - * @return the result of {@link WindowFn#getOutputTime windowFn.getOutputTime()}. - */ - @Override - @SuppressWarnings("deprecation") // this is an adapter for the deprecated behavior - public Instant assignOutputTime(Instant timestamp, W window) { - return windowFn.getOutputTime(timestamp, window); - } - - @Override - public Instant combine(Instant outputTime, Instant otherOutputTime) { - return Ordering.natural().min(outputTime, otherOutputTime); - } - - /** - * {@inheritDoc} - * - * @return {@code true}. When the {@link OutputTimeFn} is not overridden by {@link WindowFn} - * or {@link WindowingStrategy}, the minimum output timestamp is taken, which depends - * only on the minimum input timestamp by monotonicity of {@link #assignOutputTime}. - */ - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java index a82f2b3..d98793f 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/WindowingStrategy.java @@ -23,6 +23,7 @@ import org.apache.beam.sdk.transforms.windowing.DefaultTrigger; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; import org.apache.beam.sdk.transforms.windowing.WindowFn; @@ -30,8 +31,10 @@ import org.apache.beam.sdk.transforms.windowing.WindowFn; import com.google.common.base.MoreObjects; import org.joda.time.Duration; +import org.joda.time.Instant; import java.io.Serializable; +import java.util.Collections; import java.util.Objects; /** @@ -99,7 +102,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab ExecutableTrigger.create(DefaultTrigger.<W>of()), false, AccumulationMode.DISCARDING_FIRED_PANES, false, DEFAULT_ALLOWED_LATENESS, false, - windowFn.getOutputTimeFn(), false, + OutputTimeFns.outputAtEndOfWindow(), false, ClosingBehavior.FIRE_IF_NON_EMPTY); } @@ -182,7 +185,7 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab // The onus of type correctness falls on the callee. @SuppressWarnings("unchecked") OutputTimeFn<? super W> newOutputTimeFn = (OutputTimeFn<? super W>) - (outputTimeFnSpecified ? outputTimeFn : typedWindowFn.getOutputTimeFn()); + new CombineWindowFnOutputTimes<W>(outputTimeFn, typedWindowFn); return new WindowingStrategy<T, W>( typedWindowFn, @@ -223,12 +226,15 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab @SuppressWarnings("unchecked") OutputTimeFn<? super W> typedOutputTimeFn = (OutputTimeFn<? super W>) outputTimeFn; + OutputTimeFn<? super W> newOutputTimeFn = + new CombineWindowFnOutputTimes<W>(typedOutputTimeFn, windowFn); + return new WindowingStrategy<T, W>( windowFn, trigger, triggerSpecified, mode, modeSpecified, allowedLateness, allowedLatenessSpecified, - typedOutputTimeFn, true, + newOutputTimeFn, true, closingBehavior); } @@ -265,4 +271,47 @@ public class WindowingStrategy<T, W extends BoundedWindow> implements Serializab return Objects.hash(triggerSpecified, allowedLatenessSpecified, modeSpecified, windowFn, trigger, mode, allowedLateness, closingBehavior); } + + /** + * An {@link OutputTimeFn} that uses {@link WindowFn#getOutputTime} to assign initial timestamps + * but then combines and merges according to a given {@link OutputTimeFn}. + * + * <ul> + * <li>The {@link WindowFn#getOutputTime} allows adjustments such as that whereby + * {@link SlidingWindows#getOutputTime} moves elements later in time to avoid holding up + * progress downstream.</li> + * <li>Then, when multiple elements are buffered for output, the output timestamp of the + * result is calculated using {@link OutputTimeFn#combine}.</li> + * <li>In the case of a merging {@link WindowFn}, the output timestamp when windows merge + * is calculated using {@link OutputTimeFn#merge}.</li> + * </ul> + */ + private static class CombineWindowFnOutputTimes<W extends BoundedWindow> + extends OutputTimeFn.Defaults<W> { + + private final OutputTimeFn<? super W> outputTimeFn; + private final WindowFn<?, W> windowFn; + + public CombineWindowFnOutputTimes( + OutputTimeFn<? super W> outputTimeFn, WindowFn<?, W> windowFn) { + this.outputTimeFn = outputTimeFn; + this.windowFn = windowFn; + } + + @Override + public Instant assignOutputTime(Instant inputTimestamp, W window) { + return outputTimeFn.merge( + window, Collections.singleton(windowFn.getOutputTime(inputTimestamp, window))); + } + + @Override + public Instant combine(Instant timestamp, Instant otherTimestamp) { + return outputTimeFn.combine(timestamp, otherTimestamp); + } + + @Override + public Instant merge(W newWindow, Iterable<? extends Instant> timestamps) { + return outputTimeFn.merge(newWindow, timestamps); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java index 609f454..f4ce2ca 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/join/CoGroupByKeyTest.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.SerializableFunction; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; @@ -239,7 +240,8 @@ public class CoGroupByKeyTest implements Serializable { idToClick, Arrays.asList(0L, 2L, 4L, 6L, 8L)) .apply("WindowClicks", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4)))); + FixedWindows.of(new Duration(4))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); PCollection<KV<Integer, String>> purchasesTable = createInput("CreatePurchases", @@ -247,7 +249,8 @@ public class CoGroupByKeyTest implements Serializable { idToPurchases, Arrays.asList(1L, 2L, 3L, 4L, 5L, 6L, 7L, 8L, 9L, 10L)) .apply("WindowPurchases", Window.<KV<Integer, String>>into( - FixedWindows.of(new Duration(4)))); + FixedWindows.of(new Duration(4))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())); PCollection<KV<Integer, CoGbkResult>> coGbkResults = KeyedPCollectionTuple.of(clicksTag, clicksTable) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 91bd846..8ad590d 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -183,25 +183,34 @@ public class WindowTest implements Serializable { /** * Tests that when two elements are combined via a GroupByKey their output timestamp agrees - * with the windowing function default, the earlier of the two values. + * with the windowing function default, the end of the window. */ @Test @Category(RunnableOnService.class) public void testOutputTimeFnDefault() { Pipeline pipeline = TestPipeline.create(); - pipeline.apply( - Create.timestamped( - TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), - TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) + pipeline + .apply( + Create.timestamped( + TimestampedValue.of(KV.of(0, "hello"), new Instant(0)), + TimestampedValue.of(KV.of(0, "goodbye"), new Instant(10)))) .apply(Window.<KV<Integer, String>>into(FixedWindows.of(Duration.standardMinutes(10)))) .apply(GroupByKey.<Integer, String>create()) - .apply(ParDo.of(new DoFn<KV<Integer, Iterable<String>>, Void>() { - @Override - public void processElement(ProcessContext c) throws Exception { - assertThat(c.timestamp(), equalTo(new Instant(0))); - } - })); + .apply( + ParDo.of( + new DoFn<KV<Integer, Iterable<String>>, Void>() { + @Override + public void processElement(ProcessContext c) throws Exception { + assertThat( + c.timestamp(), + equalTo( + new IntervalWindow( + new Instant(0), + new Instant(0).plus(Duration.standardMinutes(10))) + .maxTimestamp())); + } + })); pipeline.run(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java index 5cbf044..65adac1 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowingTest.java @@ -72,11 +72,12 @@ public class WindowingTest implements Serializable { } @Override public PCollection<String> apply(PCollection<String> in) { - return in - .apply(Window.named("Window").<String>into(windowFn)) + return in.apply( + Window.named("Window") + .<String>into(windowFn) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp())) .apply(Count.<String>perElement()) - .apply(ParDo - .named("FormatCounts").of(new FormatCountsDoFn())) + .apply(ParDo.named("FormatCounts").of(new FormatCountsDoFn())) .setCoder(StringUtf8Coder.of()); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java index d5aa0da..4518f9f 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsProperties.java @@ -29,7 +29,6 @@ import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Sessions; @@ -123,12 +122,12 @@ public class GroupAlsoByWindowsProperties { WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(1))); + assertThat(item0.getTimestamp(), equalTo(window(0, 10).maxTimestamp())); assertThat(item0.getWindows(), contains(window(0, 10))); WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1); assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(13))); + assertThat(item1.getTimestamp(), equalTo(window(10, 20).maxTimestamp())); assertThat(item1.getWindows(), contains(window(10, 20))); } @@ -139,12 +138,13 @@ public class GroupAlsoByWindowsProperties { * * <p>In the input here, each element occurs in multiple windows. */ - public static void groupsElementsIntoSlidingWindows( + public static void groupsElementsIntoSlidingWindowsWithMinTimestamp( GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) throws Exception { WindowingStrategy<?, IntervalWindow> windowingStrategy = WindowingStrategy.of( - SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))); + SlidingWindows.of(Duration.millis(20)).every(Duration.millis(10))) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()); List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW(gabwFactory, windowingStrategy, "key", @@ -271,13 +271,13 @@ public class GroupAlsoByWindowsProperties { WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v3")); - assertThat(item0.getTimestamp(), equalTo(new Instant(1))); + assertThat(item0.getTimestamp(), equalTo(window(1, 5).maxTimestamp())); assertThat(item0.getWindows(), contains(window(0, 5))); WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1); assertThat(item1.getValue().getValue(), contains("v2")); - assertThat(item1.getTimestamp(), equalTo(new Instant(4))); + assertThat(item1.getTimestamp(), equalTo(window(0, 5).maxTimestamp())); assertThat(item1.getWindows(), contains(window(1, 5))); } @@ -314,13 +314,13 @@ public class GroupAlsoByWindowsProperties { WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0); assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getTimestamp(), equalTo(new Instant(0))); + assertThat(item0.getTimestamp(), equalTo(window(0, 15).maxTimestamp())); assertThat(item0.getWindows(), contains(window(0, 15))); WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1); assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getTimestamp(), equalTo(new Instant(15))); + assertThat(item1.getTimestamp(), equalTo(window(15, 25).maxTimestamp())); assertThat(item1.getWindows(), contains(window(15, 25))); } @@ -421,53 +421,6 @@ public class GroupAlsoByWindowsProperties { /** * Tests that for a simple sequence of elements on the same key, the given GABW implementation * correctly groups them according to fixed windows and also sets the output timestamp - * according to a custom {@link OutputTimeFn}. - */ - public static void groupsElementsIntoFixedWindowsWithCustomTimestamp( - GroupAlsoByWindowsDoFnFactory<String, String, Iterable<String>> gabwFactory) - throws Exception { - WindowingStrategy<?, IntervalWindow> windowingStrategy = - WindowingStrategy.of(FixedWindows.of(Duration.millis(10))) - .withOutputTimeFn(new OutputTimeFn.Defaults<IntervalWindow>() { - @Override - public Instant assignOutputTime(Instant inputTimestamp, IntervalWindow window) { - return inputTimestamp.isBefore(window.maxTimestamp()) - ? inputTimestamp.plus(1) : window.maxTimestamp(); - } - - @Override - public Instant combine(Instant outputTime, Instant otherOutputTime) { - return outputTime.isBefore(otherOutputTime) ? outputTime : otherOutputTime; - } - - @Override - public boolean dependsOnlyOnEarliestInputTimestamp() { - return true; - } - }); - - List<WindowedValue<KV<String, Iterable<String>>>> result = runGABW(gabwFactory, - windowingStrategy, "key", - WindowedValue.of("v1", new Instant(1), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of("v2", new Instant(2), Arrays.asList(window(0, 10)), PaneInfo.NO_FIRING), - WindowedValue.of("v3", new Instant(13), Arrays.asList(window(10, 20)), PaneInfo.NO_FIRING)); - - assertThat(result.size(), equalTo(2)); - - WindowedValue<KV<String, Iterable<String>>> item0 = result.get(0); - assertThat(item0.getValue().getValue(), containsInAnyOrder("v1", "v2")); - assertThat(item0.getWindows(), contains(window(0, 10))); - assertThat(item0.getTimestamp(), equalTo(new Instant(2))); - - WindowedValue<KV<String, Iterable<String>>> item1 = result.get(1); - assertThat(item1.getValue().getValue(), contains("v3")); - assertThat(item1.getWindows(), contains(window(10, 20))); - assertThat(item1.getTimestamp(), equalTo(new Instant(14))); - } - - /** - * Tests that for a simple sequence of elements on the same key, the given GABW implementation - * correctly groups them according to fixed windows and also sets the output timestamp * according to the policy {@link OutputTimeFns#outputAtLatestInputTimestamp()}. */ public static void groupsElementsIntoFixedWindowsWithLatestTimestamp( http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java index d9c786d..4ac6164 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/GroupAlsoByWindowsViaOutputBufferDoFnTest.java @@ -64,7 +64,7 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest { @Test public void testGroupsElementsIntoSlidingWindows() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindows( + GroupAlsoByWindowsProperties.groupsElementsIntoSlidingWindowsWithMinTimestamp( new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); } @@ -93,12 +93,6 @@ public class GroupAlsoByWindowsViaOutputBufferDoFnTest { } @Test - public void testGroupsElementsIntoFixedWindowsWithCustomTimestamp() throws Exception { - GroupAlsoByWindowsProperties.groupsElementsIntoFixedWindowsWithCustomTimestamp( - new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); - } - - @Test public void testGroupsElementsIntoSessionsWithEndOfWindowTimestamp() throws Exception { GroupAlsoByWindowsProperties.groupsElementsInMergedSessionsWithEndOfWindowTimestamp( new BufferingGABWViaOutputBufferDoFnFactory<String, String>(StringUtf8Coder.of())); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java index f2036eb..41c1710 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnRunnerTest.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.AfterWatermark; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Repeatedly; @@ -258,6 +259,7 @@ public class ReduceFnRunnerTest { .of(FixedWindows.of(Duration.millis(10))) .withTrigger(mockTrigger) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withAllowedLateness(Duration.millis(100)); TestOptions options = PipelineOptionsFactory.as(TestOptions.class); @@ -504,6 +506,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.DISCARDING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -556,6 +559,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_IF_NON_EMPTY)); tester.advanceInputWatermark(new Instant(0)); @@ -582,6 +586,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); @@ -610,6 +615,7 @@ public class ReduceFnRunnerTest { AfterWatermark.pastEndOfWindow()))) .withMode(AccumulationMode.ACCUMULATING_FIRED_PANES) .withAllowedLateness(Duration.millis(100)) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withClosingBehavior(ClosingBehavior.FIRE_ALWAYS)); tester.advanceInputWatermark(new Instant(0)); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java index f296d65..9916c5c 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReduceFnTester.java @@ -36,6 +36,7 @@ import org.apache.beam.sdk.transforms.CombineWithContext.KeyedCombineFnWithConte import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFns; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.TriggerBuilder; @@ -134,6 +135,7 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { Duration allowedDataLateness, ClosingBehavior closingBehavior) throws Exception { WindowingStrategy<?, W> strategy = WindowingStrategy.of(windowFn) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) .withTrigger(trigger.buildTrigger()) .withMode(mode) .withAllowedLateness(allowedDataLateness) @@ -185,8 +187,11 @@ public class ReduceFnTester<InputT, OutputT, W extends BoundedWindow> { Duration allowedDataLateness) throws Exception { WindowingStrategy<?, W> strategy = - WindowingStrategy.of(windowFn).withTrigger(trigger).withMode(mode).withAllowedLateness( - allowedDataLateness); + WindowingStrategy.of(windowFn) + .withOutputTimeFn(OutputTimeFns.outputAtEarliestInputTimestamp()) + .withTrigger(trigger) + .withMode(mode) + .withAllowedLateness(allowedDataLateness); return combining(strategy, combineFn, outputCoder); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3755c557/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java index f17f64c..b7388ee 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/state/CopyOnAccessInMemoryStateInternalsTest.java @@ -35,7 +35,6 @@ import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.Combine.KeyedCombineFn; -import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; @@ -220,9 +219,8 @@ public class CopyOnAccessInMemoryStateInternalsTest { CopyOnAccessInMemoryStateInternals<String> underlying = CopyOnAccessInMemoryStateInternals.withUnderlying(key, null); - @SuppressWarnings("unchecked") - OutputTimeFn<BoundedWindow> outputTimeFn = (OutputTimeFn<BoundedWindow>) - TestPipeline.create().apply(Create.of("foo")).getWindowingStrategy().getOutputTimeFn(); + OutputTimeFn<BoundedWindow> outputTimeFn = + OutputTimeFns.outputAtEarliestInputTimestamp(); StateNamespace namespace = new StateNamespaceForTest("foo"); StateTag<Object, WatermarkHoldState<BoundedWindow>> stateTag =