Repository: beam
Updated Branches:
  refs/heads/master 918c7fa36 -> 01de25575


Improve check around accumulation mode

If multiple panes can ever be produced, accumulation mode must be
explicitly set, as there is currently no default accumulation mode. A
pipeline with allowed lateness and the default trigger can produce
multiple panes; as a result, accumulation mode must be set even if there
is no custom trigger if allowed lateness is specified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/89cbf3bf
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/89cbf3bf
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/89cbf3bf

Branch: refs/heads/master
Commit: 89cbf3bfbbf6677cb294b3d85645f2f90f85414a
Parents: 918c7fa
Author: Thomas Groh <tg...@google.com>
Authored: Fri Feb 17 09:36:41 2017 -0800
Committer: Thomas Groh <tg...@google.com>
Committed: Fri Feb 17 19:50:28 2017 -0800

----------------------------------------------------------------------
 .../beam/sdk/transforms/windowing/Window.java   | 42 +++++++++++------
 .../beam/sdk/transforms/SplittableDoFnTest.java |  3 +-
 .../sdk/transforms/windowing/WindowTest.java    | 48 ++++++++++++++++++--
 3 files changed, 72 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
index faa3704..eac1c97 100644
--- 
a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
+++ 
b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java
@@ -434,22 +434,34 @@ public class Window {
 
       // Make sure that the windowing strategy is complete & valid.
       if (outputStrategy.isTriggerSpecified()
-          && !(outputStrategy.getTrigger() instanceof DefaultTrigger)) {
-        if (!(outputStrategy.getWindowFn() instanceof GlobalWindows)
-            && !outputStrategy.isAllowedLatenessSpecified()) {
-          throw new IllegalArgumentException("Except when using GlobalWindows,"
-              + " calling .triggering() to specify a trigger requires that the 
allowed lateness be"
-              + " specified using .withAllowedLateness() to set the upper 
bound on how late data"
-              + " can arrive before being dropped. See Javadoc for more 
details.");
-        }
-
-        if (!outputStrategy.isModeSpecified()) {
-          throw new IllegalArgumentException(
-              "Calling .triggering() to specify a trigger requires that the 
accumulation mode be"
-              + " specified using .discardingFiredPanes() or 
.accumulatingFiredPanes()."
-              + " See Javadoc for more details.");
-        }
+          && !(outputStrategy.getTrigger() instanceof DefaultTrigger)
+          && !(outputStrategy.getWindowFn() instanceof GlobalWindows)
+          && !outputStrategy.isAllowedLatenessSpecified()) {
+        throw new IllegalArgumentException(
+            "Except when using GlobalWindows,"
+                + " calling .triggering() to specify a trigger requires that 
the allowed lateness"
+                + " be specified using .withAllowedLateness() to set the upper 
bound on how late"
+                + " data can arrive before being dropped. See Javadoc for more 
details.");
       }
+
+      if (!outputStrategy.isModeSpecified() && 
canProduceMultiplePanes(outputStrategy)) {
+        throw new IllegalArgumentException(
+            "Calling .triggering() to specify a trigger or calling 
.withAllowedLateness() to"
+                + " specify an allowed lateness greater than zero requires 
that the accumulation"
+                + " mode be specified using .discardingFiredPanes() or 
.accumulatingFiredPanes()."
+                + " See Javadoc for more details.");
+      }
+    }
+
+    private boolean canProduceMultiplePanes(WindowingStrategy<?, ?> strategy) {
+      // The default trigger is 
Repeatedly.forever(AfterWatermark.pastEndOfWindow()); This fires
+      // for every late-arriving element if allowed lateness is nonzero, and 
thus we must have
+      // an accumulating mode specified
+      boolean dataCanArriveLate =
+          !(strategy.getWindowFn() instanceof GlobalWindows)
+              && strategy.getAllowedLateness().getMillis() > 0;
+      boolean hasCustomTrigger = !(strategy.getTrigger() instanceof 
DefaultTrigger);
+      return dataCanArriveLate || hasCustomTrigger;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
index e5c0ef0..fefccc4 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java
@@ -310,7 +310,8 @@ public class SplittableDoFnTest {
         p.apply(stream)
             .apply(
                 
Window.<String>into(FixedWindows.of(Duration.standardMinutes(1)))
-                    .withAllowedLateness(Duration.standardMinutes(1)));
+                    .withAllowedLateness(Duration.standardMinutes(1))
+                    .discardingFiredPanes());
 
     PCollection<KV<String, Integer>> afterSDF =
         input

http://git-wip-us.apache.org/repos/asf/beam/blob/89cbf3bf/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
----------------------------------------------------------------------
diff --git 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
index e21668e..55c7297 100644
--- 
a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
+++ 
b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java
@@ -21,6 +21,7 @@ import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisp
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey;
 import static 
org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includesDisplayDataFor;
 import static org.hamcrest.Matchers.equalTo;
+import static org.hamcrest.Matchers.is;
 import static org.hamcrest.Matchers.isOneOf;
 import static org.hamcrest.Matchers.not;
 import static org.junit.Assert.assertEquals;
@@ -42,6 +43,7 @@ import org.apache.beam.sdk.transforms.display.DisplayData;
 import org.apache.beam.sdk.util.WindowingStrategy;
 import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode;
 import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TimestampedValue;
 import org.hamcrest.Matchers;
 import org.joda.time.Duration;
@@ -96,6 +98,26 @@ public class WindowTest implements Serializable {
   }
 
   @Test
+  public void testWindowIntoAccumulatingLatenessNoTrigger() {
+    FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
+    WindowingStrategy<?, ?> strategy =
+        pipeline
+            .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
+            .apply(
+                "Lateness",
+                Window.<String>into(fixed)
+                    .withAllowedLateness(Duration.standardDays(1))
+                    .accumulatingFiredPanes())
+            .getWindowingStrategy();
+
+    assertThat(strategy.isTriggerSpecified(), is(false));
+    assertThat(strategy.isModeSpecified(), is(true));
+    assertThat(strategy.isAllowedLatenessSpecified(), is(true));
+    assertThat(strategy.getMode(), 
equalTo(AccumulationMode.ACCUMULATING_FIRED_PANES));
+    assertThat(strategy.getAllowedLateness(), 
equalTo(Duration.standardDays(1)));
+  }
+
+  @Test
   public void testWindowPropagatesEachPart() {
     FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
     Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
@@ -159,13 +181,29 @@ public class WindowTest implements Serializable {
     FixedWindows fixed10 = FixedWindows.of(Duration.standardMinutes(10));
     Repeatedly trigger = Repeatedly.forever(AfterPane.elementCountAtLeast(5));
 
+    PCollection<String> input =
+        pipeline
+            .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
+            .apply("Window", Window.<String>into(fixed10));
     thrown.expect(IllegalArgumentException.class);
     thrown.expectMessage("requires that the accumulation mode");
-    pipeline
-      .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
-      .apply("Window", Window.<String>into(fixed10))
-      .apply("Lateness", 
Window.<String>withAllowedLateness(Duration.standardDays(1)))
-      .apply("Trigger", Window.<String>triggering(trigger));
+    input.apply(
+        "Triggering",
+        
Window.<String>withAllowedLateness(Duration.standardDays(1)).triggering(trigger));
+  }
+
+  @Test
+  public void testMissingModeViaLateness() {
+    FixedWindows fixed = FixedWindows.of(Duration.standardMinutes(10));
+    PCollection<String> input =
+        pipeline
+            .apply(Create.of("hello", "world").withCoder(StringUtf8Coder.of()))
+            .apply("Window", Window.<String>into(fixed));
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage("allowed lateness");
+    thrown.expectMessage("accumulation mode be specified");
+    input
+        .apply("Lateness", 
Window.<String>withAllowedLateness(Duration.standardDays(1)));
   }
 
   @Test

Reply via email to