Repository: beam Updated Branches: refs/heads/master f54072a1b -> 0064fb37a
[BEAM-2314] Add ValidatesRunner test for merging custom windows Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/dfa983ce Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/dfa983ce Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/dfa983ce Branch: refs/heads/master Commit: dfa983ce4adb85d211497460254b6a95944ce869 Parents: f54072a Author: Etienne Chauchot <echauc...@gmail.com> Authored: Mon May 29 12:05:51 2017 +0200 Committer: Aviem Zur <aviem...@gmail.com> Committed: Mon Jul 24 14:33:00 2017 +0300 ---------------------------------------------------------------------- runners/spark/pom.xml | 3 +- .../sdk/testing/UsesCustomWindowMerging.java | 23 +++ .../sdk/transforms/windowing/WindowTest.java | 184 +++++++++++++++++++ 3 files changed, 209 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/runners/spark/pom.xml ---------------------------------------------------------------------- diff --git a/runners/spark/pom.xml b/runners/spark/pom.xml index 7f70204..35e933b 100644 --- a/runners/spark/pom.xml +++ b/runners/spark/pom.xml @@ -77,7 +77,8 @@ <excludedGroups> org.apache.beam.sdk.testing.UsesSplittableParDo, org.apache.beam.sdk.testing.UsesCommittedMetrics, - org.apache.beam.sdk.testing.UsesTestStream + org.apache.beam.sdk.testing.UsesTestStream, + org.apache.beam.sdk.testing.UsesCustomWindowMerging </excludedGroups> <parallel>none</parallel> <forkCount>1</forkCount> http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java new file mode 100644 index 0000000..fc40e02 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesCustomWindowMerging.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +/** + * Category tag for validation tests which utilize custom window merging. + */ +public interface UsesCustomWindowMerging {} http://git-wip-us.apache.org/repos/asf/beam/blob/dfa983ce/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java index 65af7a1..5b6d046 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/windowing/WindowTest.java @@ -31,19 +31,30 @@ import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.when; import com.google.common.collect.Iterables; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.io.Serializable; +import java.util.ArrayList; import java.util.Collection; import java.util.Collections; +import java.util.List; +import java.util.Objects; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.beam.sdk.Pipeline.PipelineVisitor; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.Coder.NonDeterministicException; +import org.apache.beam.sdk.coders.CustomCoder; import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.io.GenerateSequence; import org.apache.beam.sdk.runners.TransformHierarchy; import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.testing.UsesCustomWindowMerging; import org.apache.beam.sdk.testing.ValidatesRunner; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Count; import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.GroupByKey; @@ -570,4 +581,177 @@ public class WindowTest implements Serializable { assertThat(data, not(hasDisplayItem("trigger"))); assertThat(data, not(hasDisplayItem("allowedLateness"))); } + @Test + @Category({ValidatesRunner.class, UsesCustomWindowMerging.class}) + public void testMergingCustomWindows() { + Instant startInstant = new Instant(0L); + List<TimestampedValue<String>> input = new ArrayList<>(); + input.add(TimestampedValue.of("big", startInstant.plus(Duration.standardSeconds(10)))); + input.add(TimestampedValue.of("small1", startInstant.plus(Duration.standardSeconds(20)))); + // This one will be outside of bigWindow thus not merged + input.add(TimestampedValue.of("small2", startInstant.plus(Duration.standardSeconds(39)))); + PCollection<String> inputCollection = pipeline.apply(Create.timestamped(input)); + PCollection<String> windowedCollection = inputCollection + .apply(Window.into(new CustomWindowFn<String>())); + PCollection<Long> count = windowedCollection + .apply(Combine.globally(Count.<String>combineFn()).withoutDefaults()); + // "small1" and "big" elements merged into bigWindow "small2" not merged + // because timestamp is not in bigWindow + PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L); + pipeline.run(); + } + + // This test is usefull because some runners have a special merge implementation + // for keyed collections + @Test + @Category({ValidatesRunner.class, UsesCustomWindowMerging.class}) + public void testMergingCustomWindowsKeyedCollection() { + Instant startInstant = new Instant(0L); + List<TimestampedValue<KV<Integer, String>>> input = new ArrayList<>(); + input + .add(TimestampedValue.of(KV.of(0, "big"), startInstant.plus(Duration.standardSeconds(10)))); + input.add( + TimestampedValue.of(KV.of(1, "small1"), startInstant.plus(Duration.standardSeconds(20)))); + // This one will be outside of bigWindow thus not merged + input.add( + TimestampedValue.of(KV.of(2, "small2"), startInstant.plus(Duration.standardSeconds(39)))); + PCollection<KV<Integer, String>> inputCollection = pipeline.apply(Create.timestamped(input)); + PCollection<KV<Integer, String>> windowedCollection = inputCollection + .apply(Window.into(new CustomWindowFn<KV<Integer, String>>())); + PCollection<Long> count = windowedCollection + .apply(Combine.globally(Count.<KV<Integer, String>>combineFn()).withoutDefaults()); + // "small1" and "big" elements merged into bigWindow "small2" not merged + // because timestamp is not in bigWindow + PAssert.that("Wrong number of elements in output collection", count).containsInAnyOrder(2L, 1L); + pipeline.run(); + } + + private static class CustomWindow extends IntervalWindow { + + private boolean isBig; + + + CustomWindow(Instant start, Instant end) { + super(start, end); + this.isBig = false; + } + + CustomWindow(Instant start, Instant end, boolean isBig) { + super(start, end); + this.isBig = isBig; + } + + @Override public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + if (!super.equals(o)) { + return false; + } + CustomWindow that = (CustomWindow) o; + return isBig == that.isBig; + } + + @Override public int hashCode() { + return Objects.hash(super.hashCode(), isBig); + } + } + + private static class CustomWindowCoder extends + CustomCoder<CustomWindow> { + + private static final CustomWindowCoder INSTANCE = new CustomWindowCoder(); + private static final Coder<IntervalWindow> INTERVAL_WINDOW_CODER = IntervalWindow.getCoder(); + private static final VarIntCoder VAR_INT_CODER = VarIntCoder.of(); + + public static CustomWindowCoder of() { + return INSTANCE; + } + + @Override + public void encode(CustomWindow window, OutputStream outStream) + throws IOException { + INTERVAL_WINDOW_CODER.encode(window, outStream); + VAR_INT_CODER.encode(window.isBig ? 1 : 0, outStream); + } + + @Override + public CustomWindow decode(InputStream inStream) throws IOException { + IntervalWindow superWindow = INTERVAL_WINDOW_CODER.decode(inStream); + boolean isBig = VAR_INT_CODER.decode(inStream) != 0; + return new CustomWindow(superWindow.start(), superWindow.end(), isBig); + } + + @Override + public void verifyDeterministic() throws NonDeterministicException { + INTERVAL_WINDOW_CODER.verifyDeterministic(); + VAR_INT_CODER.verifyDeterministic(); + } + } + + private static class CustomWindowFn<T> extends WindowFn<T, CustomWindow> { + + @Override public Collection<CustomWindow> assignWindows(AssignContext c) throws Exception { + String element; + // It loses genericity of type T but this is not a big deal for a test. + // And it allows to avoid duplicating CustomWindowFn to support PCollection<KV> + if (c.element() instanceof KV){ + element = ((KV<Integer, String>) c.element()).getValue(); + } else { + element = (String) c.element(); + } + // put big elements in windows of 30s and small ones in windows of 5s + if ("big".equals(element)) { + return Collections.singletonList( + new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(30)), + true)); + } else { + return Collections.singletonList( + new CustomWindow(c.timestamp(), c.timestamp().plus(Duration.standardSeconds(5)), + false)); + } + } + + @Override + public void mergeWindows(MergeContext c) throws Exception { + List<CustomWindow> toBeMerged = new ArrayList<>(); + CustomWindow bigWindow = null; + for (CustomWindow customWindow : c.windows()) { + if (customWindow.isBig) { + bigWindow = customWindow; + toBeMerged.add(customWindow); + } else if (bigWindow != null + && customWindow.start().isAfter(bigWindow.start()) + && customWindow.end().isBefore(bigWindow.end())) { + toBeMerged.add(customWindow); + } + } + // in case bigWindow has not been seen yet + if (bigWindow != null) { + // merge small windows into big windows + c.merge(toBeMerged, bigWindow); + } + } + + @Override + public boolean isCompatible(WindowFn<?, ?> other) { + return other instanceof CustomWindowFn; + } + + @Override + public Coder<CustomWindow> windowCoder() { + return CustomWindowCoder.of(); + } + + @Override + public WindowMappingFn<CustomWindow> getDefaultWindowMappingFn() { + throw new UnsupportedOperationException("side inputs not supported"); + } + + + } + }