Repository: beam Updated Branches: refs/heads/master 918c7fa36 -> 01de25575
Improve check around accumulation mode If multiple panes can ever be produced, accumulation mode must be explicitly set, as there is currently no default accumulation mode. A pipeline with allowed lateness and the default trigger can produce multiple panes; as a result, accumulation mode must be set even if there is no custom trigger if allowed lateness is specified. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89cbf3bf Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89cbf3bf Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89cbf3bf Branch: refs/heads/master Commit: 89cbf3bfbbf6677cb294b3d85645f2f90f85414a Parents: 918c7fa Author: Thomas Groh <tg...@google.com> Authored: Fri Feb 17 09:36:41 2017 -0800 Committer: Thomas Groh <tg...@google.com> Committed: Fri Feb 17 19:50:28 2017 -0800 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/Window.java | 42 +++++++++++------ .../beam/sdk/transforms/SplittableDoFnTest.java | 3 +- .../sdk/transforms/windowing/WindowTest.java | 48 ++++++++++++++++++-- 3 files changed, 72 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index faa3704..eac1c97 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -434,22 +434,34 @@ public class Window { // Make sure that the windowing strategy is complete & valid. if (outputStrategy.isTriggerSpecified() - && !(outputStrategy.getTrigger() instanceof DefaultTrigger)) { - if (!(outputStrategy.getWindowFn() instanceof GlobalWindows) - && !outputStrategy.isAllowedLatenessSpecified()) { - throw new IllegalArgumentException("Except when using GlobalWindows," - + " calling .triggering() to specify a trigger requires that the allowed lateness be" - + " specified using .withAllowedLateness() to set the upper bound on how late data" - + " can arrive before being dropped. See Javadoc for more details."); - } - - if (!outputStrategy.isModeSpecified()) { - throw new IllegalArgumentException( - "Calling .triggering() to specify a trigger requires that the accumulation mode be" - + " specified using .discardingFiredPanes() or .accumulatingFiredPanes()." - + " See Javadoc for more details."); - } + && !(outputStrategy.getTrigger() instanceof DefaultTrigger) + && !(outputStrategy.getWindowFn() instanceof GlobalWindows) + && !outputStrategy.isAllowedLatenessSpecified()) { + throw new IllegalArgumentException( + "Except when using GlobalWindows," + + " calling .triggering() to specify a trigger requires that the allowed lateness" + + " be specified using .withAllowedLateness() to set the upper bound on how late" + + " data can arrive before being dropped. See Javadoc for more details."); } + + if (!outputStrategy.isModeSpecified() && canProduceMultiplePanes(outputStrategy)) { + throw new IllegalArgumentException( + "Calling .triggering() to specify a trigger or calling .withAllowedLateness() to" + + " specify an allowed lateness greater than zero requires that the accumulation" + + " mode be specified using .discardingFiredPanes() or .accumulatingFiredPanes()." + + " See Javadoc for more details."); + } + } + + private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) { + // The default trigger is Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires + // for every late-arriving element if allowed lateness is nonzero, and thus we must have + // an accumulating mode specified + boolean dataCanArriveLate = + !(strategy.getWindowFn() instanceof GlobalWindows) + && strategy.getAllowedLateness().getMillis() > 0; + boolean hasCustomTrigger = !(strategy.getTrigger() instanceof DefaultTrigger); + return dataCanArriveLate || hasCustomTrigger; } @Override http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index e5c0ef0..fefccc4 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -310,7 +310,8 @@ public class SplittableDoFnTest { p.apply(stream) .apply( Window.<String>into(FixedWindows.of(Duration.standardMinutes(1))) - .withAllowedLateness(Duration.standardMinutes(1))); + .withAllowedLateness(Duration.standardMinutes(1)) + .discardingFiredPanes()); PCollection<KV<String, Integer>> afterSDF = input http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index e21668e..55c7297 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -21,6 +21,7 @@ import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor; import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.isOneOf; import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; @@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; import org.joda.time.Duration; @@ -96,6 +98,26 @@ public class WindowTest implements Serializable { } @Test + public void testWindowIntoAccumulatingLatenessNoTrigger() { + FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10)); + WindowingStrategy<?, ?> strategy = + pipeline + .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) + .apply( + "Lateness", + Window.<String>into(fixed) + .withAllowedLateness(Duration.standardDays(1)) + .accumulatingFiredPanes()) + .getWindowingStrategy(); + + assertThat(strategy.isTriggerSpecified(), is(false)); + assertThat(strategy.isModeSpecified(), is(true)); + assertThat(strategy.isAllowedLatenessSpecified(), is(true)); + assertThat(strategy.getMode(), equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES)); + assertThat(strategy.getAllowedLateness(), equalTo(Duration.standardDays(1))); + } + + @Test public void testWindowPropagatesEachPart() { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); @@ -159,13 +181,29 @@ public class WindowTest implements Serializable { FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10)); Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5)); + PCollection<String> input = + pipeline + .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) + .apply("Window", Window.<String>into(fixed10)); thrown.expect(IllegalArgumentException.class); thrown.expectMessage("requires that the accumulation mode"); - pipeline - .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) - .apply("Window", Window.<String>into(fixed10)) - .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))) - .apply("Trigger", Window.<String>triggering(trigger)); + input.apply( + "Triggering", + Window.<String>withAllowedLateness(Duration.standardDays(1)).triggering(trigger)); + } + + @Test + public void testMissingModeViaLateness() { + FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10)); + PCollection<String> input = + pipeline + .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of())) + .apply("Window", Window.<String>into(fixed)); + thrown.expect(IllegalArgumentException.class); + thrown.expectMessage("allowed lateness"); + thrown.expectMessage("accumulation mode be specified"); + input + .apply("Lateness", Window.<String>withAllowedLateness(Duration.standardDays(1))); } @Test