Explodes windows before GBKIKWI Also * Adds a test for windowed side inputs that requires this behavior. * Adds a test category for SDF with windowed side input. Runners should gradually implement this properly. For now only direct runner implements this properly.
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/6ac3ac50 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/6ac3ac50 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/6ac3ac50 Branch: refs/heads/DSL_SQL Commit: 6ac3ac50fec2eb02927c0a07ca928967cfef5652 Parents: b93de58 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Apr 17 11:28:24 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Apr 18 18:02:07 2017 -0700 ---------------------------------------------------------------------- .../beam/runners/core/SplittableParDo.java | 75 +++++++++--------- .../beam/runners/core/SplittableParDoTest.java | 82 +++++++------------- runners/flink/runner/pom.xml | 3 +- ...esSplittableParDoWithWindowedSideInputs.java | 26 +++++++ .../beam/sdk/transforms/SplittableDoFnTest.java | 41 ++++++++++ 5 files changed, 137 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 44db1f7..31d89ee 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -19,10 +19,8 @@ package org.apache.beam.runners.core; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.base.Preconditions.checkNotNull; -import static com.google.common.base.Preconditions.checkState; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; import java.util.List; import java.util.UUID; @@ -138,6 +136,12 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> .setCoder(splitCoder) .apply("Split restriction", ParDo.of(new SplitRestrictionFn<InputT, RestrictionT>(fn))) .setCoder(splitCoder) + // ProcessFn requires all input elements to be in a single window and have a single + // element per work item. This must precede the unique keying so each key has a single + // associated element. + .apply( + "Explode windows", + ParDo.of(new ExplodeWindowsFn<ElementAndRestriction<InputT, RestrictionT>>())) .apply( "Assign unique key", WithKeys.of(new RandomUniqueKeyFn<ElementAndRestriction<InputT, RestrictionT>>())) @@ -158,6 +162,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> } /** + * A {@link DoFn} that forces each of its outputs to be in a single window, by indicating to the + * runner that it observes the window of its input element, so the runner is forced to apply it to + * each input in a single window and thus its output is also in a single window. + */ + private static class ExplodeWindowsFn<InputT> extends DoFn<InputT, InputT> { + @ProcessElement + public void process(ProcessContext c, BoundedWindow window) { + c.output(c.element()); + } + } + + /** * Runner-specific primitive {@link GroupByKey GroupByKey-like} {@link PTransform} that produces * {@link KeyedWorkItem KeyedWorkItems} so that downstream transforms can access state and timers. * @@ -317,6 +333,13 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> * The heart of splittable {@link DoFn} execution: processes a single (element, restriction) pair * by creating a tracker for the restriction and checkpointing/resuming processing later if * necessary. + * + * <p>Takes {@link KeyedWorkItem} and assumes that the KeyedWorkItem contains a single element + * (or a single timer set by {@link ProcessFn itself}, in a single window. This is necessary + * because {@link ProcessFn} sets timers, and timers are namespaced to a single window and it + * should be the window of the input element. + * + * <p>See also: https://issues.apache.org/jira/browse/BEAM-1983 */ @VisibleForTesting public static class ProcessFn< @@ -441,7 +464,18 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> // Subsequent calls are timer firings and the element has to be retrieved from the state. TimerInternals.TimerData timer = Iterables.getOnlyElement(c.element().timersIterable(), null); boolean isSeedCall = (timer == null); - StateNamespace stateNamespace = isSeedCall ? StateNamespaces.global() : timer.getNamespace(); + StateNamespace stateNamespace; + if (isSeedCall) { + WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = + Iterables.getOnlyElement(c.element().elementsIterable()); + BoundedWindow window = Iterables.getOnlyElement(windowedValue.getWindows()); + stateNamespace = + StateNamespaces.window( + (Coder<BoundedWindow>) inputWindowingStrategy.getWindowFn().windowCoder(), window); + } else { + stateNamespace = timer.getNamespace(); + } + ValueState<WindowedValue<InputT>> elementState = stateInternals.state(stateNamespace, elementTag); ValueState<RestrictionT> restrictionState = @@ -451,15 +485,8 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> ElementAndRestriction<WindowedValue<InputT>, RestrictionT> elementAndRestriction; if (isSeedCall) { - // The element and restriction are available in c.element(). - // elementsIterable() will, by construction of SplittableParDo, contain the same value - // potentially in several different windows. We implode this into a single WindowedValue - // in order to simplify the rest of the code and avoid iterating over elementsIterable() - // explicitly. The windows of this WindowedValue will be propagated to windows of the - // output. This is correct because a splittable DoFn is not allowed to inspect the window - // of its element. WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue = - implodeWindows(c.element().elementsIterable()); + Iterables.getOnlyElement(c.element().elementsIterable()); WindowedValue<InputT> element = windowedValue.withValue(windowedValue.getValue().element()); elementState.write(element); elementAndRestriction = @@ -498,32 +525,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> stateNamespace, timerInternals.currentProcessingTime(), TimeDomain.PROCESSING_TIME)); } - /** - * Does the opposite of {@link WindowedValue#explodeWindows()} - creates a single {@link - * WindowedValue} from a collection of {@link WindowedValue}'s that is known to contain copies - * of the same value with the same timestamp, but different window sets. - * - * <p>This is only legal to do because we know that {@link RandomUniqueKeyFn} created unique - * keys for every {@link ElementAndRestriction}, so if there's multiple {@link WindowedValue}'s - * for the same key, that means only that the windows of that {@link ElementAndRestriction} are - * being delivered separately rather than all at once. It is also legal to do because splittable - * {@link DoFn} is not allowed to access the window of its element, so we can propagate the full - * set of windows of its input to its output. - */ - private static <InputT, RestrictionT> - WindowedValue<ElementAndRestriction<InputT, RestrictionT>> implodeWindows( - Iterable<WindowedValue<ElementAndRestriction<InputT, RestrictionT>>> values) { - WindowedValue<ElementAndRestriction<InputT, RestrictionT>> first = - Iterables.getFirst(values, null); - checkState(first != null, "Got a KeyedWorkItem with no elements and no timers"); - ImmutableList.Builder<BoundedWindow> windows = ImmutableList.builder(); - for (WindowedValue<ElementAndRestriction<InputT, RestrictionT>> value : values) { - windows.addAll(value.getWindows()); - } - return WindowedValue.of( - first.getValue(), first.getTimestamp(), windows.build(), first.getPane()); - } - private DoFn<InputT, OutputT>.Context wrapContext(final Context baseContext) { return fn.new Context() { @Override http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index 5629635..1a44453 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -30,6 +30,7 @@ import java.io.Serializable; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.NoSuchElementException; import java.util.concurrent.Executors; @@ -194,11 +195,6 @@ public class SplittableParDoTest { // ------------------------------- Tests for ProcessFn --------------------------------- - enum WindowExplosion { - EXPLODE_WINDOWS, - DO_NOT_EXPLODE_WINDOWS - } - /** * A helper for testing {@link SplittableParDo.ProcessFn} on 1 element (but possibly over multiple * {@link DoFn.ProcessElement} calls). @@ -293,24 +289,13 @@ public class SplittableParDoTest { ElementAndRestriction.of(element, restriction), currentProcessingTime, GlobalWindow.INSTANCE, - PaneInfo.ON_TIME_AND_ONLY_FIRING), - WindowExplosion.DO_NOT_EXPLODE_WINDOWS); + PaneInfo.ON_TIME_AND_ONLY_FIRING)); } - void startElement( - WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue, - WindowExplosion explosion) + void startElement(WindowedValue<ElementAndRestriction<InputT, RestrictionT>> windowedValue) throws Exception { - switch (explosion) { - case EXPLODE_WINDOWS: - tester.processElement( - KeyedWorkItems.elementsWorkItem("key", windowedValue.explodeWindows())); - break; - case DO_NOT_EXPLODE_WINDOWS: - tester.processElement( - KeyedWorkItems.elementsWorkItem("key", Arrays.asList(windowedValue))); - break; - } + tester.processElement( + KeyedWorkItems.elementsWorkItem("key", Collections.singletonList(windowedValue))); } /** @@ -394,46 +379,39 @@ public class SplittableParDoTest { } @Test - public void testTrivialProcessFnPropagatesOutputsWindowsAndTimestamp() throws Exception { - // Tests that ProcessFn correctly propagates windows and timestamp of the element + public void testTrivialProcessFnPropagatesOutputWindowAndTimestamp() throws Exception { + // Tests that ProcessFn correctly propagates the window and timestamp of the element // inside the KeyedWorkItem. // The underlying DoFn is actually monolithic, so this doesn't test splitting. DoFn<Integer, String> fn = new ToStringFn(); Instant base = Instant.now(); - IntervalWindow w1 = + IntervalWindow w = new IntervalWindow( base.minus(Duration.standardMinutes(1)), base.plus(Duration.standardMinutes(1))); - IntervalWindow w2 = - new IntervalWindow( - base.minus(Duration.standardMinutes(2)), base.plus(Duration.standardMinutes(2))); - IntervalWindow w3 = - new IntervalWindow( - base.minus(Duration.standardMinutes(3)), base.plus(Duration.standardMinutes(3))); - - for (WindowExplosion explosion : WindowExplosion.values()) { - ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = - new ProcessFnTester<>( - base, fn, BigEndianIntegerCoder.of(), SerializableCoder.of(SomeRestriction.class), - MAX_OUTPUTS_PER_BUNDLE, MAX_BUNDLE_DURATION); - tester.startElement( - WindowedValue.of( - ElementAndRestriction.of(42, new SomeRestriction()), - base, - Arrays.asList(w1, w2, w3), - PaneInfo.ON_TIME_AND_ONLY_FIRING), - explosion); - - for (IntervalWindow w : new IntervalWindow[] {w1, w2, w3}) { - assertEquals( - Arrays.asList( - TimestampedValue.of("42a", base), - TimestampedValue.of("42b", base), - TimestampedValue.of("42c", base)), - tester.peekOutputElementsInWindow(w)); - } - } + + ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + base, + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(SomeRestriction.class), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION); + tester.startElement( + WindowedValue.of( + ElementAndRestriction.of(42, new SomeRestriction()), + base, + Collections.singletonList(w), + PaneInfo.ON_TIME_AND_ONLY_FIRING)); + + assertEquals( + Arrays.asList( + TimestampedValue.of("42a", base), + TimestampedValue.of("42b", base), + TimestampedValue.of("42c", base)), + tester.peekOutputElementsInWindow(w)); } private static class WatermarkUpdateFn extends DoFn<Instant, String> { http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/runners/flink/runner/pom.xml ---------------------------------------------------------------------- diff --git a/runners/flink/runner/pom.xml b/runners/flink/runner/pom.xml index 95880f4..1e6452d 100644 --- a/runners/flink/runner/pom.xml +++ b/runners/flink/runner/pom.xml @@ -91,7 +91,8 @@ org.apache.beam.sdk.testing.UsesMapState, org.apache.beam.sdk.testing.UsesAttemptedMetrics, org.apache.beam.sdk.testing.UsesCommittedMetrics, - org.apache.beam.sdk.testing.UsesTestStream + org.apache.beam.sdk.testing.UsesTestStream, + org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs </excludedGroups> <parallel>none</parallel> <failIfNoTests>true</failIfNoTests> http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java new file mode 100644 index 0000000..2b1d673 --- /dev/null +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/testing/UsesSplittableParDoWithWindowedSideInputs.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.testing; + +import org.apache.beam.sdk.transforms.ParDo; + +/** + * Category tag for validation tests which utilize splittable {@link ParDo} and use + * windowed side inputs. + */ +public interface UsesSplittableParDoWithWindowedSideInputs {} http://git-wip-us.apache.org/repos/asf/beam/blob/6ac3ac50/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java index 30329f4..a0f1fd3 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/SplittableDoFnTest.java @@ -33,6 +33,7 @@ import org.apache.beam.sdk.testing.PAssert; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.testing.TestStream; import org.apache.beam.sdk.testing.UsesSplittableParDo; +import org.apache.beam.sdk.testing.UsesSplittableParDoWithWindowedSideInputs; import org.apache.beam.sdk.testing.UsesTestStream; import org.apache.beam.sdk.testing.ValidatesRunner; import org.apache.beam.sdk.transforms.DoFn.BoundedPerElement; @@ -252,6 +253,46 @@ public class SplittableDoFnTest implements Serializable { p.run(); } + @Test + @Category({ + ValidatesRunner.class, + UsesSplittableParDo.class, + UsesSplittableParDoWithWindowedSideInputs.class + }) + public void testWindowedSideInput() throws Exception { + PCollection<Integer> mainInput = + p.apply("main", + Create.timestamped( + TimestampedValue.of(0, new Instant(0)), + TimestampedValue.of(1, new Instant(1)), + TimestampedValue.of(2, new Instant(2)), + TimestampedValue.of(3, new Instant(3)), + TimestampedValue.of(4, new Instant(4)), + TimestampedValue.of(5, new Instant(5)), + TimestampedValue.of(6, new Instant(6)), + TimestampedValue.of(7, new Instant(7)))) + .apply("window 2", Window.<Integer>into(FixedWindows.of(Duration.millis(2)))); + + PCollectionView<String> sideInput = + p.apply("side", + Create.timestamped( + TimestampedValue.of("a", new Instant(0)), + TimestampedValue.of("b", new Instant(4)))) + .apply("window 4", Window.<String>into(FixedWindows.of(Duration.millis(4)))) + .apply("singleton", View.<String>asSingleton()); + + PCollection<String> res = + mainInput.apply(ParDo.of(new SDFWithSideInput(sideInput)).withSideInputs(sideInput)); + + PAssert.that(res).containsInAnyOrder("a:0", "a:1", "a:2", "a:3", "b:4", "b:5", "b:6", "b:7"); + + p.run(); + + // TODO: also add test coverage when the SDF checkpoints - the resumed call should also + // properly access side inputs. + // TODO: also test coverage when some of the windows of the side input are not ready. + } + private static class SDFWithAdditionalOutput extends DoFn<Integer, String> { private final TupleTag<String> additionalOutput;