Repository: incubator-beam Updated Branches: refs/heads/master c8ed39806 -> 9ff426964
Add display data to windowing transforms Expose NeverTrigger as package-private since it is necessary for display data Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/450dd856 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/450dd856 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/450dd856 Branch: refs/heads/master Commit: 450dd856f00fc898bb9d2bdc1a5331f13a6c52c1 Parents: c8ed398 Author: Scott Wegner <sweg...@google.com> Authored: Mon Apr 11 15:38:39 2016 -0700 Committer: bchambers <bchamb...@google.com> Committed: Fri Apr 15 11:27:39 2016 -0700 ---------------------------------------------------------------------- .../beam/sdk/transforms/windowing/AfterAll.java | 10 +++ .../windowing/AfterDelayFromFirstElement.java | 11 ++- .../sdk/transforms/windowing/AfterEach.java | 10 +++ .../sdk/transforms/windowing/AfterFirst.java | 10 +++ .../windowing/AfterProcessingTime.java | 11 ++- .../transforms/windowing/AfterWatermark.java | 24 ++++++- .../transforms/windowing/CalendarWindows.java | 35 ++++++++-- .../sdk/transforms/windowing/FixedWindows.java | 8 +++ .../beam/sdk/transforms/windowing/Never.java | 3 +- .../transforms/windowing/OrFinallyTrigger.java | 5 ++ .../sdk/transforms/windowing/Repeatedly.java | 7 +- .../beam/sdk/transforms/windowing/Sessions.java | 6 ++ .../transforms/windowing/SlidingWindows.java | 9 +++ .../beam/sdk/transforms/windowing/Window.java | 25 +++++++ .../beam/sdk/transforms/windowing/WindowFn.java | 14 +++- .../apache/beam/sdk/util/ReshuffleTrigger.java | 5 ++ .../sdk/transforms/windowing/AfterAllTest.java | 5 ++ .../sdk/transforms/windowing/AfterEachTest.java | 10 +++ .../transforms/windowing/AfterFirstTest.java | 6 ++ .../sdk/transforms/windowing/AfterPaneTest.java | 6 ++ .../windowing/AfterProcessingTimeTest.java | 31 +++++++++ .../windowing/AfterWatermarkTest.java | 27 ++++++++ .../windowing/CalendarWindowsTest.java | 31 +++++++++ .../transforms/windowing/FixedWindowsTest.java | 14 ++++ .../windowing/OrFinallyTriggerTest.java | 6 ++ .../transforms/windowing/RepeatedlyTest.java | 13 ++++ .../sdk/transforms/windowing/SessionsTest.java | 10 +++ .../windowing/SlidingWindowsTest.java | 20 ++++++ .../sdk/transforms/windowing/StubTrigger.java | 71 ++++++++++++++++++++ .../sdk/transforms/windowing/TriggerTest.java | 2 +- .../sdk/transforms/windowing/WindowTest.java | 59 ++++++++++++++++ .../beam/sdk/util/ReshuffleTriggerTest.java | 10 ++- 32 files changed, 501 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java index ac1fa43..0f609df 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterAll.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.joda.time.Instant; @@ -112,4 +113,13 @@ public class AfterAll extends OnceTrigger { subtrigger.invokeOnFire(context); } } + + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterAll.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java index 83e0bea..7ec3ce9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterDelayFromFirstElement.java @@ -36,10 +36,12 @@ import com.google.common.collect.ImmutableList; import org.joda.time.Duration; import org.joda.time.Instant; +import org.joda.time.format.PeriodFormat; +import org.joda.time.format.PeriodFormatter; import java.util.List; +import java.util.Locale; import java.util.Objects; - import javax.annotation.Nullable; /** @@ -59,6 +61,8 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { StateTags.makeSystemTagInternal(StateTags.combiningValueFromInputInternal( "delayed", InstantCoder.of(), Min.MinFn.<Instant>naturalOrder())); + private static final PeriodFormatter PERIOD_FORMATTER = PeriodFormat.wordBased(Locale.ENGLISH); + /** * To complete an implementation, return the desired time from the TriggerContext. */ @@ -276,6 +280,11 @@ public abstract class AfterDelayFromFirstElement extends OnceTrigger { public int hashCode() { return Objects.hash(delay); } + + @Override + public String toString() { + return PERIOD_FORMATTER.print(delay.toPeriod()); + } } /** http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java index 0c80851..59cb73c 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterEach.java @@ -22,6 +22,7 @@ import static com.google.common.base.Preconditions.checkArgument; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import org.joda.time.Instant; import java.util.Arrays; @@ -127,6 +128,15 @@ public class AfterEach extends Trigger { updateFinishedState(context); } + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterEach.inOrder("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + private void updateFinishedState(TriggerContext context) { context.trigger().setFinished(context.trigger().firstUnfinishedSubTrigger() == null); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java index 1462ec4..a8508a3 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterFirst.java @@ -21,6 +21,7 @@ import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.transforms.windowing.Trigger.OnceTrigger; import org.apache.beam.sdk.util.ExecutableTrigger; +import com.google.common.base.Joiner; import com.google.common.base.Preconditions; import org.joda.time.Instant; @@ -108,6 +109,15 @@ public class AfterFirst extends OnceTrigger { } } + @Override + public String toString() { + StringBuilder builder = new StringBuilder("AfterFirst.of("); + Joiner.on(", ").appendTo(builder, subTriggers); + builder.append(")"); + + return builder.toString(); + } + private void updateFinishedStatus(TriggerContext c) { boolean anyFinished = false; for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java index b2ea1b4..05c6815 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTime.java @@ -25,7 +25,6 @@ import org.joda.time.Instant; import java.util.List; import java.util.Objects; - import javax.annotation.Nullable; /** @@ -74,7 +73,15 @@ public class AfterProcessingTime extends AfterDelayFromFirstElement { @Override public String toString() { - return "AfterProcessingTime.pastFirstElementInPane(" + timestampMappers + ")"; + StringBuilder builder = new StringBuilder("AfterProcessingTime.pastFirstElementInPane()"); + for (SerializableFunction<Instant, Instant> delayFn : timestampMappers) { + builder + .append(".plusDelayOf(") + .append(delayFn) + .append(")"); + } + + return builder.toString(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java index 05c6eb8..e48cc44 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/AfterWatermark.java @@ -63,6 +63,8 @@ import java.util.Objects; @Experimental(Experimental.Kind.TRIGGER) public class AfterWatermark { + private static final String TO_STRING = "AfterWatermark.pastEndOfWindow()"; + // Static factory class. private AfterWatermark() {} @@ -220,6 +222,26 @@ public class AfterWatermark { } } + @Override + public String toString() { + StringBuilder builder = new StringBuilder(TO_STRING); + + Trigger earlyTrigger = subTriggers.get(EARLY_INDEX); + if (!(earlyTrigger instanceof Never.NeverTrigger)) { + builder + .append(".withEarlyFirings(") + .append(earlyTrigger) + .append(")"); + } + + builder + .append(".withLateFirings(") + .append(subTriggers.get(LATE_INDEX)) + .append(")"); + + return builder.toString(); + } + private void onNonLateFiring(Trigger.TriggerContext context) throws Exception { // We have not yet transitioned to late firings. ExecutableTrigger earlySubtrigger = context.trigger().subTrigger(EARLY_INDEX); @@ -328,7 +350,7 @@ public class AfterWatermark { @Override public String toString() { - return "AfterWatermark.pastEndOfWindow()"; + return TO_STRING; } @Override http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java index 794d5fd..8656537 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/CalendarWindows.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -35,6 +36,8 @@ import org.joda.time.Years; */ public class CalendarWindows { + private static final DateTime DEFAULT_START_DATE = new DateTime(0, DateTimeZone.UTC); + /** * Returns a {@link WindowFn} that windows elements into periods measured by days. * @@ -42,7 +45,7 @@ public class CalendarWindows { * separate windows for each day. */ public static DaysWindows days(int number) { - return new DaysWindows(number, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new DaysWindows(number, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -54,7 +57,7 @@ public class CalendarWindows { public static DaysWindows weeks(int number, int startDayOfWeek) { return new DaysWindows( 7 * number, - new DateTime(0, DateTimeZone.UTC).withDayOfWeek(startDayOfWeek), + DEFAULT_START_DATE.withDayOfWeek(startDayOfWeek), DateTimeZone.UTC); } @@ -67,7 +70,7 @@ public class CalendarWindows { * and the first window begins in January 2014. */ public static MonthsWindows months(int number) { - return new MonthsWindows(number, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new MonthsWindows(number, 1, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -79,7 +82,7 @@ public class CalendarWindows { * America/Los_Angeles time zone. */ public static YearsWindows years(int number) { - return new YearsWindows(number, 1, 1, new DateTime(0, DateTimeZone.UTC), DateTimeZone.UTC); + return new YearsWindows(number, 1, 1, DEFAULT_START_DATE, DateTimeZone.UTC); } /** @@ -142,6 +145,14 @@ public class CalendarWindows { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numDays", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public int getNumber() { return number; } @@ -229,6 +240,14 @@ public class CalendarWindows { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numMonths", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public int getNumber() { return number; } @@ -325,6 +344,14 @@ public class CalendarWindows { && timeZone == that.timeZone; } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("numYears", number) + .addIfNotDefault("startDate", new DateTime(startDate, timeZone).toInstant(), + new DateTime(DEFAULT_START_DATE, DateTimeZone.UTC).toInstant()); + } + public DateTimeZone getTimeZone() { return timeZone; } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java index cc43887..bba1f3b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/FixedWindows.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -83,6 +84,13 @@ public class FixedWindows extends PartitioningWindowFn<Object, IntervalWindow> { } @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("size", size) + .addIfNotDefault("offset", offset, Duration.ZERO); + } + + @Override public Coder<IntervalWindow> windowCoder() { return IntervalWindow.getCoder(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java index 809e841..8e3e664 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Never.java @@ -41,7 +41,8 @@ public final class Never { return new NeverTrigger(); } - private static class NeverTrigger extends OnceTrigger { + // package-private in order to check identity for string formatting. + static class NeverTrigger extends OnceTrigger { protected NeverTrigger() { super(null); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java index 48e1dc2..c48f5f4 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTrigger.java @@ -93,6 +93,11 @@ class OrFinallyTrigger extends Trigger { updateFinishedState(context); } + @Override + public String toString() { + return String.format("%s.orFinally(%s)", subTriggers.get(ACTUAL), subTriggers.get(UNTIL)); + } + private void updateFinishedState(TriggerContext c) throws Exception { boolean anyStillFinished = false; for (ExecutableTrigger subTrigger : c.trigger().subTriggers()) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java index 414f107..ec79cf9 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Repeatedly.java @@ -40,7 +40,7 @@ public class Repeatedly extends Trigger { private static final int REPEATED = 0; /** - * Create a composite trigger that repeatedly executes the trigger {@code toRepeat}, firing each + * Create a composite trigger that repeatedly executes the trigger {@code repeated}, firing each * time it fires and ignoring any indications to finish. * * <p>Unless used with {@link Trigger#orFinally} the composite trigger will never finish. @@ -92,6 +92,11 @@ public class Repeatedly extends Trigger { } } + @Override + public String toString() { + return String.format("Repeatedly.forever(%s)", subTriggers.get(REPEATED)); + } + private ExecutableTrigger getRepeated(TriggerContext context) { return context.trigger().subTrigger(REPEATED); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java index 3be6454..74ca268 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Sessions.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; @@ -98,6 +99,11 @@ public class Sessions extends WindowFn<Object, IntervalWindow> { } @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder.add("gapDuration", gapDuration); + } + + @Override public boolean equals(Object object) { if (!(object instanceof Sessions)) { return false; http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java index 3a7b072..abb4078 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/SlidingWindows.java @@ -20,6 +20,7 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -139,6 +140,14 @@ public class SlidingWindows extends NonMergingWindowFn<Object, IntervalWindow> { return equals(other); } + @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("size", size) + .add("period", period) + .add("offset", offset); + } + /** * Return the last start of a sliding window that contains the timestamp. */ http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java index 2c162f2..da512b8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/Window.java @@ -25,6 +25,7 @@ import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.PCollection; @@ -596,6 +597,30 @@ public class Window { } @Override + public void populateDisplayData(DisplayData.Builder builder) { + builder + .add("windowFn", windowFn.getClass()) + .include(windowFn) + .addIfNotNull("allowedLateness", allowedLateness); + + if (trigger != null && !(trigger instanceof DefaultTrigger)) { + builder.add("trigger", trigger.toString()); + } + + if (mode != null) { + builder.add("accumulationMode", mode.toString()); + } + + if (closingBehavior != null) { + builder.add("closingBehavior", closingBehavior.toString()); + } + + if (outputTimeFn != null) { + builder.add("outputTimeFn", outputTimeFn.getClass()); + } + } + + @Override protected Coder<?> getDefaultOutputCoder(PCollection<T> input) { return input.getCoder(); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java index 1dda7d0..2eac936 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/windowing/WindowFn.java @@ -20,6 +20,8 @@ package org.apache.beam.sdk.transforms.windowing; import org.apache.beam.sdk.annotations.Experimental; import org.apache.beam.sdk.annotations.Experimental.Kind; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.display.DisplayData; +import org.apache.beam.sdk.transforms.display.HasDisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import com.google.common.collect.Ordering; @@ -50,7 +52,7 @@ import java.util.Collection; * windows used by this {@code WindowFn} */ public abstract class WindowFn<T, W extends BoundedWindow> - implements Serializable { + implements Serializable, HasDisplayData { /** * Information available when running {@link #assignWindows}. */ @@ -179,6 +181,16 @@ public abstract class WindowFn<T, W extends BoundedWindow> } /** + * {@inheritDoc} + * + * <p>By default, does not register any display data. Implementors may override this method + * to provide their own display metadata. + */ + @Override + public void populateDisplayData(DisplayData.Builder builder) { + } + + /** * A compatibility adapter that will return the assigned timestamps according to the * {@link WindowFn}, which was the prior policy. Specifying the assigned output timestamps * on the {@link WindowFn} is now deprecated. http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java index e5168b2..0a47634 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/util/ReshuffleTrigger.java @@ -60,4 +60,9 @@ public class ReshuffleTrigger<W extends BoundedWindow> extends Trigger { @Override public void onFire(Trigger.TriggerContext context) throws Exception { } + + @Override + public String toString() { + return "ReshuffleTrigger()"; + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java index 9ab4e5c..969c1fe 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterAllTest.java @@ -149,4 +149,9 @@ public class AfterAllTest { afterAll.getContinuationTrigger()); } + @Test + public void testToString() { + Trigger trigger = AfterAll.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterAll.of(t1, t2)", trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java index d3d43c9..f5d83a7 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterEachTest.java @@ -120,4 +120,14 @@ public class AfterEachTest { trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger())), afterEach.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterEach.inOrder( + StubTrigger.named("t1"), + StubTrigger.named("t2"), + StubTrigger.named("t3")); + + assertEquals("AfterEach.inOrder(t1, t2, t3)", trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java index d70c4d6..c0a9f2b 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterFirstTest.java @@ -173,4 +173,10 @@ public class AfterFirstTest { AfterFirst.of(trigger1.getContinuationTrigger(), trigger2.getContinuationTrigger()), afterFirst.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterFirst.of(StubTrigger.named("t1"), StubTrigger.named("t2")); + assertEquals("AfterFirst.of(t1, t2)", trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java index eeda9ed..827d4c6 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterPaneTest.java @@ -124,4 +124,10 @@ public class AfterPaneTest { AfterPane.elementCountAtLeast(1), AfterPane.elementCountAtLeast(100).getContinuationTrigger().getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = AfterPane.elementCountAtLeast(5); + assertEquals("AfterPane.elementCountAtLeast(5)", trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java index 4245e78..81aad33 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterProcessingTimeTest.java @@ -155,4 +155,35 @@ public class AfterProcessingTimeTest { .plusDelayOf(Duration.standardMinutes(1L)); assertTrue(t1.isCompatible(t2)); } + + @Test + public void testToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane(); + assertEquals("AfterProcessingTime.pastFirstElementInPane()", trigger.toString()); + } + + @Test + public void testWithDelayToString() { + Trigger trigger = AfterProcessingTime.pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(5)); + + assertEquals("AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5 minutes)", + trigger.toString()); + } + + @Test + public void testBuiltUpToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(AfterProcessingTime + .pastFirstElementInPane() + .plusDelayOf(Duration.standardMinutes(10))) + .buildTrigger(); + + String expected = "AfterWatermark.pastEndOfWindow()" + + ".withLateFirings(AfterProcessingTime" + + ".pastFirstElementInPane()" + + ".plusDelayOf(10 minutes))"; + + assertEquals(expected, trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java index 00e2d22..ef84714 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/AfterWatermarkTest.java @@ -17,6 +17,7 @@ */ package org.apache.beam.sdk.transforms.windowing; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doNothing; @@ -336,4 +337,30 @@ public class AfterWatermarkTest { tester.advanceInputWatermark(new Instant(15)); assertTrue(tester.shouldFire(mergedWindow)); } + + @Test + public void testFromEndOfWindowToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow(); + assertEquals("AfterWatermark.pastEndOfWindow()", trigger.toString()); + } + + @Test + public void testLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withLateFirings(StubTrigger.named("t1")) + .buildTrigger(); + + assertEquals("AfterWatermark.pastEndOfWindow().withLateFirings(t1)", trigger.toString()); + } + + @Test + public void testEarlyAndLateFiringsToString() { + Trigger trigger = AfterWatermark.pastEndOfWindow() + .withEarlyFirings(StubTrigger.named("t1")) + .withLateFirings(StubTrigger.named("t2")) + .buildTrigger(); + + assertEquals("AfterWatermark.pastEndOfWindow().withEarlyFirings(t1).withLateFirings(t2)", + trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java index 31e9a94..4598a27 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/CalendarWindowsTest.java @@ -19,9 +19,13 @@ package org.apache.beam.sdk.transforms.windowing; import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.Assert.assertEquals; +import org.apache.beam.sdk.transforms.display.DisplayData; + import org.joda.time.DateTime; import org.joda.time.DateTimeConstants; import org.joda.time.DateTimeZone; @@ -259,4 +263,31 @@ public class CalendarWindowsTest { CalendarWindows.days(1).withTimeZone(timeZone), timestamps)); } + + @Test + public void testDisplayData() { + DateTimeZone timeZone = DateTimeZone.forID("America/Los_Angeles"); + Instant jan1 = new DateTime(1990, 1, 1, 0, 0, timeZone).toInstant(); + + CalendarWindows.DaysWindows daysWindow = CalendarWindows.days(5) + .withStartingDay(1990, 1, 1) + .withTimeZone(timeZone); + DisplayData daysDisplayData = DisplayData.from(daysWindow); + assertThat(daysDisplayData, hasDisplayItem("numDays", 5)); + assertThat(daysDisplayData, hasDisplayItem("startDate", jan1)); + + CalendarWindows.MonthsWindows monthsWindow = CalendarWindows.months(2) + .withStartingMonth(1990, 1) + .withTimeZone(timeZone); + DisplayData monthsDisplayData = DisplayData.from(monthsWindow); + assertThat(monthsDisplayData, hasDisplayItem("numMonths", 2)); + assertThat(monthsDisplayData, hasDisplayItem("startDate", jan1)); + + CalendarWindows.YearsWindows yearsWindow = CalendarWindows.years(4) + .withStartingYear(1990) + .withTimeZone(timeZone); + DisplayData yearsDisplayData = DisplayData.from(yearsWindow); + assertThat(yearsDisplayData, hasDisplayItem("numYears", 4)); + assertThat(yearsDisplayData, hasDisplayItem("startDate", jan1)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java index c8ee9ac..fc1caac 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/FixedWindowsTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.windowing; import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertEquals; @@ -28,6 +29,7 @@ import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -123,4 +125,16 @@ public class FixedWindowsTest { FixedWindows.of(new Duration(500)), timestamp); } } + + @Test + public void testDisplayData() { + Duration offset = Duration.standardSeconds(1234); + Duration size = Duration.standardSeconds(2345); + + FixedWindows fixedWindows = FixedWindows.of(size).withOffset(offset); + DisplayData displayData = DisplayData.from(fixedWindows); + + assertThat(displayData, hasDisplayItem("size", size)); + assertThat(displayData, hasDisplayItem("offset", offset)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java index e4e9cb3..ea178a8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/OrFinallyTriggerTest.java @@ -207,4 +207,10 @@ public class OrFinallyTriggerTest { triggerB.getContinuationTrigger().orFinally(triggerA.getContinuationTrigger())), bOrFinallyA.getContinuationTrigger()); } + + @Test + public void testToString() { + Trigger trigger = StubTrigger.named("t1").orFinally(StubTrigger.named("t2")); + assertEquals("t1.orFinally(t2)", trigger.toString()); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java index 9b74767..ddb9f9a 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/RepeatedlyTest.java @@ -209,4 +209,17 @@ public class RepeatedlyTest { assertFalse(tester.shouldFire(window)); } + + @Test + public void testToString() { + Trigger trigger = Repeatedly.forever(new StubTrigger() { + @Override + public String toString() { + return "innerTrigger"; + } + }); + + assertEquals("Repeatedly.forever(innerTrigger)", trigger.toString()); + } + } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java index 932d416..a543359 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SessionsTest.java @@ -19,6 +19,7 @@ package org.apache.beam.sdk.transforms.windowing; import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -27,6 +28,7 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import com.google.common.collect.ImmutableList; @@ -155,4 +157,12 @@ public class SessionsTest { (List<Long>) ImmutableList.of(1L, 3L), (List<Long>) ImmutableList.of(0L, 5L, 10L, 15L, 20L))); } + + @Test + public void testDisplayData() { + Duration gapDuration = Duration.standardMinutes(234); + Sessions session = Sessions.withGapDuration(gapDuration); + assertThat(DisplayData.from(session), + hasDisplayItem("gapDuration", gapDuration)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java index a310cb3..047a413 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/SlidingWindowsTest.java @@ -19,12 +19,15 @@ package org.apache.beam.sdk.transforms.windowing; import static org.apache.beam.sdk.testing.WindowFnTestUtils.runWindowFn; import static org.apache.beam.sdk.testing.WindowFnTestUtils.set; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.testing.WindowFnTestUtils; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.joda.time.Duration; import org.joda.time.Instant; @@ -192,4 +195,21 @@ public class SlidingWindowsTest { SlidingWindows.of(new Duration(1000)).every(new Duration(500)), timestamp); } } + + @Test + public void testDisplayData() { + Duration windowSize = Duration.standardSeconds(1234); + Duration offset = Duration.standardSeconds(2345); + Duration period = Duration.standardSeconds(3456); + + SlidingWindows slidingWindowFn = SlidingWindows + .of(windowSize) + .every(period) + .withOffset(offset); + + DisplayData displayData = DisplayData.from(slidingWindowFn); + assertThat(displayData, hasDisplayItem("size", windowSize)); + assertThat(displayData, hasDisplayItem("period", period)); + assertThat(displayData, hasDisplayItem("offset", offset)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java new file mode 100644 index 0000000..738c0bc --- /dev/null +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/StubTrigger.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.transforms.windowing; + +import com.google.api.client.util.Lists; +import org.joda.time.Instant; + +import java.util.List; + +/** + * No-op {@link OnceTrigger} implementation for testing. + */ +abstract class StubTrigger extends Trigger.OnceTrigger { + /** + * Create a stub {@link Trigger} instance which returns the specified name on {@link #toString()}. + */ + static StubTrigger named(final String name) { + return new StubTrigger() { + @Override + public String toString() { + return name; + } + }; + } + + protected StubTrigger() { + super(Lists.<Trigger>newArrayList()); + } + + @Override + protected void onOnlyFiring(TriggerContext context) throws Exception { + } + + @Override + public void onElement(OnElementContext c) throws Exception { + } + + @Override + public void onMerge(OnMergeContext c) throws Exception { + } + + @Override + public boolean shouldFire(TriggerContext context) throws Exception { + return false; + } + + @Override + protected Trigger getContinuationTrigger(List<Trigger> continuationTriggers) { + return null; + } + + @Override + public Instant getWatermarkThatGuaranteesFiring(BoundedWindow window) { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java index a8287a3..43c8bd8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/TriggerTest.java @@ -38,7 +38,7 @@ public class TriggerTest { @Test public void testTriggerToString() throws Exception { assertEquals("AfterWatermark.pastEndOfWindow()", AfterWatermark.pastEndOfWindow().toString()); - assertEquals("Repeatedly(AfterWatermark.pastEndOfWindow())", + assertEquals("Repeatedly.forever(AfterWatermark.pastEndOfWindow())", Repeatedly.forever(AfterWatermark.pastEndOfWindow()).toString()); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 359ad23..6be6df8 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -17,7 +17,13 @@ */ package org.apache.beam.sdk.transforms.windowing; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasDisplayItem; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.hasKey; +import static org.apache.beam.sdk.transforms.display.DisplayDataMatchers.includes; + import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.isOneOf; +import static org.hamcrest.Matchers.not; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -33,6 +39,7 @@ import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.display.DisplayData; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.util.WindowingStrategy.AccumulationMode; import org.apache.beam.sdk.values.KV; @@ -224,4 +231,56 @@ public class WindowTest implements Serializable { pipeline.run(); } + + @Test + public void testDisplayData() { + FixedWindows windowFn = FixedWindows.of(Duration.standardHours(5)); + AfterWatermark.FromEndOfWindow triggerBuilder = AfterWatermark.pastEndOfWindow(); + Duration allowedLateness = Duration.standardMinutes(10); + Window.ClosingBehavior closingBehavior = Window.ClosingBehavior.FIRE_IF_NON_EMPTY; + OutputTimeFn<BoundedWindow> outputTimeFn = OutputTimeFns.outputAtEndOfWindow(); + + Window.Bound<?> window = Window + .into(windowFn) + .triggering(triggerBuilder) + .accumulatingFiredPanes() + .withAllowedLateness(allowedLateness, closingBehavior) + .withOutputTimeFn(outputTimeFn); + + DisplayData displayData = DisplayData.from(window); + + assertThat(displayData, hasDisplayItem("windowFn", windowFn.getClass())); + assertThat(displayData, includes(windowFn)); + + assertThat(displayData, hasDisplayItem("trigger", triggerBuilder.toString())); + assertThat(displayData, + hasDisplayItem("accumulationMode", AccumulationMode.ACCUMULATING_FIRED_PANES.toString())); + assertThat(displayData, + hasDisplayItem("allowedLateness", allowedLateness)); + assertThat(displayData, hasDisplayItem("closingBehavior", closingBehavior.toString())); + assertThat(displayData, hasDisplayItem("outputTimeFn", outputTimeFn.getClass())); + } + + @Test + public void testDisplayDataExcludesUnspecifiedProperties() { + Window.Bound<?> window = Window.into(new GlobalWindows()); + + DisplayData displayData = DisplayData.from(window); + assertThat(displayData, not(hasDisplayItem(hasKey(isOneOf( + "trigger", + "outputTimeFn", + "accumulationMode", + "allowedLateness", + "closingBehavior"))))); + + } + + @Test + public void testDisplayDataExcludesDefaultTrigger() { + Window.Bound<?> window = Window.into(new GlobalWindows()) + .triggering(DefaultTrigger.of()); + + DisplayData data = DisplayData.from(window); + assertThat(data, not(hasDisplayItem(hasKey("trigger")))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/450dd856/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java index cabde7f..b17ce81 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/util/ReshuffleTriggerTest.java @@ -17,12 +17,14 @@ */ package org.apache.beam.sdk.util; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.FixedWindows; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.Trigger; import org.joda.time.Duration; import org.joda.time.Instant; @@ -36,7 +38,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class ReshuffleTriggerTest { - /** Public so that other tests can instantiate ReshufleTrigger. */ + /** Public so that other tests can instantiate {@link ReshuffleTrigger}. */ public static <W extends BoundedWindow> ReshuffleTrigger<W> forTest() { return new ReshuffleTrigger<>(); } @@ -57,4 +59,10 @@ public class ReshuffleTriggerTest { tester.fireIfShouldFire(arbitraryWindow); assertFalse(tester.isMarkedFinished(arbitraryWindow)); } + + @Test + public void testToString() { + Trigger trigger = new ReshuffleTrigger<>(); + assertEquals("ReshuffleTrigger()", trigger.toString()); + } }