Add inEarlyPanesInGlobalWindow as a PAssert Extractor This is for use in asserting the contents of speculative panes in the global window.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f15fab8c Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f15fab8c Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f15fab8c Branch: refs/heads/master Commit: f15fab8ccdb3b40004583e8f7e4e32a0b8ba5121 Parents: bfa3b70 Author: Thomas Groh <tg...@google.com> Authored: Thu Aug 11 15:46:10 2016 -0700 Committer: Luke Cwik <lc...@google.com> Committed: Fri Aug 19 09:04:18 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/beam/sdk/testing/PAssert.java | 18 ++++++++++++++++++ .../apache/beam/sdk/testing/PaneExtractors.java | 18 ++++++++++++++++++ 2 files changed, 36 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java index e07ee3d..3f1a741 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PAssert.java @@ -49,6 +49,7 @@ import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindows; import org.apache.beam.sdk.transforms.windowing.Never; +import org.apache.beam.sdk.transforms.windowing.PaneInfo.Timing; import org.apache.beam.sdk.transforms.windowing.Trigger; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.Window.ClosingBehavior; @@ -176,6 +177,13 @@ public class PAssert { IterableAssert<T> inCombinedNonLatePanes(BoundedWindow window); /** + * Creates a new {@link IterableAssert} like this one, but with the assertion restricted to only + * run on panes in the {@link GlobalWindow} that were emitted before the {@link GlobalWindow} + * closed. These panes have {@link Timing#EARLY}. + */ + IterableAssert<T> inEarlyGlobalWindowPanes(); + + /** * Asserts that the iterable in question contains the provided elements. * * @return the same {@link IterableAssert} builder for further assertions @@ -381,6 +389,11 @@ public class PAssert { return withPane(window, PaneExtractors.<T>nonLatePanes()); } + @Override + public IterableAssert<T> inEarlyGlobalWindowPanes() { + return withPane(GlobalWindow.INSTANCE, PaneExtractors.<T>earlyPanes()); + } + private PCollectionContentsAssert<T> withPane( BoundedWindow window, SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> paneExtractor) { @@ -557,6 +570,11 @@ public class PAssert { return withPanes(window, PaneExtractors.<Iterable<T>>nonLatePanes()); } + @Override + public IterableAssert<T> inEarlyGlobalWindowPanes() { + return withPanes(GlobalWindow.INSTANCE, PaneExtractors.<Iterable<T>>earlyPanes()); + } + private PCollectionSingletonIterableAssert<T> withPanes( BoundedWindow window, SimpleFunction<Iterable<WindowedValue<Iterable<T>>>, Iterable<Iterable<T>>> paneExtractor) { http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f15fab8c/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java index f699bfc..899612b 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/PaneExtractors.java @@ -59,6 +59,10 @@ final class PaneExtractors { return new ExtractNonLatePanes<>(); } + static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> earlyPanes() { + return new ExtractEarlyPanes<>(); + } + static <T> SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> allPanes() { return new ExtractAllPanes<>(); } @@ -137,4 +141,18 @@ final class PaneExtractors { return outputs; } } + + private static class ExtractEarlyPanes<T> + extends SimpleFunction<Iterable<WindowedValue<T>>, Iterable<T>> { + @Override + public Iterable<T> apply(Iterable<WindowedValue<T>> input) { + List<T> outputs = new ArrayList<>(); + for (WindowedValue<T> value : input) { + if (value.getPane().getTiming() == PaneInfo.Timing.EARLY) { + outputs.add(value.getValue()); + } + } + return outputs; + } + } }