Supports window parameter in DoFnTester Also prohibits other parameters, and prohibits output from bundle methods (whereas previously it was silently dropped).
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/78ac009b Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/78ac009b Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/78ac009b Branch: refs/heads/master Commit: 78ac009be743a2e053580e9966f841174b636e88 Parents: 9645576 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Fri Dec 2 11:39:48 2016 -0800 Committer: Kenneth Knowles <k...@google.com> Committed: Fri Dec 2 15:42:33 2016 -0800 ---------------------------------------------------------------------- .../apache/beam/sdk/transforms/DoFnTester.java | 166 ++++++++++++++----- .../beam/sdk/transforms/DoFnTesterTest.java | 34 ++++ 2 files changed, 158 insertions(+), 42 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java index a9f93dd..7c1abef 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFnTester.java @@ -38,13 +38,18 @@ import org.apache.beam.sdk.testing.ValueInSingleWindow; import org.apache.beam.sdk.transforms.Combine.CombineFn; import org.apache.beam.sdk.transforms.reflect.DoFnInvoker; import org.apache.beam.sdk.transforms.reflect.DoFnInvokers; +import org.apache.beam.sdk.transforms.reflect.DoFnSignature; +import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; +import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.util.Timer; import org.apache.beam.sdk.util.TimerInternals; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.state.InMemoryStateInternals; import org.apache.beam.sdk.util.state.InMemoryTimerInternals; import org.apache.beam.sdk.util.state.StateInternals; @@ -84,6 +89,9 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { /** * Returns a {@code DoFnTester} supporting unit-testing of the given * {@link DoFn}. By default, uses {@link CloningBehavior#CLONE_ONCE}. + * + * <p>The only supported extra parameter of the {@link DoFn.ProcessElement} method is + * {@link BoundedWindow}. */ @SuppressWarnings("unchecked") public static <InputT, OutputT> DoFnTester<InputT, OutputT> of(DoFn<InputT, OutputT> fn) { @@ -236,7 +244,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { if (state == State.UNINITIALIZED) { initializeState(); } - TestContext context = createContext(fn); + TestContext context = new TestContext(); context.setupDelegateAggregators(); // State and timer internals are per-bundle. stateInternals = InMemoryStateInternals.forKey(new Object()); @@ -262,7 +270,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { /** * Calls the {@link DoFn.ProcessElement} method on the {@link DoFn} under test, in a * context where {@link DoFn.ProcessContext#element} returns the - * given element. + * given element and the element is in the global window. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. @@ -277,26 +285,86 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { /** * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a * context where {@link DoFn.ProcessContext#element} returns the - * given element and timestamp. + * given element and timestamp and the element is in the global window. * * <p>Will call {@link #startBundle} automatically, if it hasn't * already been called. - * - * <p>If the input timestamp is {@literal null}, the minimum timestamp will be used. */ public void processTimestampedElement(TimestampedValue<InputT> element) throws Exception { checkNotNull(element, "Timestamped element cannot be null"); + processWindowedElement( + element.getValue(), element.getTimestamp(), GlobalWindow.INSTANCE); + } + + /** + * Calls {@link DoFn.ProcessElement} on the {@code DoFn} under test, in a + * context where {@link DoFn.ProcessContext#element} returns the + * given element and timestamp and the element is in the given window. + * + * <p>Will call {@link #startBundle} automatically, if it hasn't + * already been called. + */ + public void processWindowedElement( + InputT element, Instant timestamp, final BoundedWindow window) throws Exception { if (state != State.BUNDLE_STARTED) { startBundle(); } try { - final TestProcessContext processContext = createProcessContext(element); - fnInvoker.invokeProcessElement(new DoFnInvoker.FakeArgumentProvider<InputT, OutputT>() { - @Override - public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { - return processContext; - } - }); + final TestProcessContext processContext = + new TestProcessContext( + ValueInSingleWindow.of(element, timestamp, window, PaneInfo.NO_FIRING)); + fnInvoker.invokeProcessElement( + new DoFnInvoker.ArgumentProvider<InputT, OutputT>() { + @Override + public BoundedWindow window() { + return window; + } + + @Override + public DoFn<InputT, OutputT>.Context context(DoFn<InputT, OutputT> doFn) { + throw new UnsupportedOperationException( + "Not expected to access DoFn.Context from @ProcessElement"); + } + + @Override + public DoFn<InputT, OutputT>.ProcessContext processContext(DoFn<InputT, OutputT> doFn) { + return processContext; + } + + @Override + public DoFn.InputProvider<InputT> inputProvider() { + throw new UnsupportedOperationException( + "Not expected to access InputProvider from DoFnTester"); + } + + @Override + public DoFn.OutputReceiver<OutputT> outputReceiver() { + throw new UnsupportedOperationException( + "Not expected to access OutputReceiver from DoFnTester"); + } + + @Override + public WindowingInternals<InputT, OutputT> windowingInternals() { + throw new UnsupportedOperationException( + "Not expected to access WindowingInternals from a new DoFn"); + } + + @Override + public <RestrictionT> RestrictionTracker<RestrictionT> restrictionTracker() { + throw new UnsupportedOperationException( + "Not expected to access RestrictionTracker from a regular DoFn in DoFnTester"); + } + + @Override + public org.apache.beam.sdk.util.state.State state(String stateId) { + throw new UnsupportedOperationException("DoFnTester doesn't support state yet"); + } + + @Override + public Timer timer(String timerId) { + throw new UnsupportedOperationException("DoFnTester doesn't support timers yet"); + } + }); } catch (UserCodeException e) { unwrapUserCodeException(e); } @@ -318,7 +386,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { "Must be inside bundle to call finishBundle, but was: %s", state); try { - fnInvoker.invokeFinishBundle(createContext(fn)); + fnInvoker.invokeFinishBundle(new TestContext()); } catch (UserCodeException e) { unwrapUserCodeException(e); } @@ -543,10 +611,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { return mainOutputTag; } - private TestContext createContext(DoFn<InputT, OutputT> fn) { - return new TestContext(); - } - private class TestContext extends DoFn<InputT, OutputT>.Context { TestContext() { fn.super(); @@ -559,12 +623,27 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public void output(OutputT output) { - sideOutput(mainOutputTag, output); + throwUnsupportedOutputFromBundleMethods(); } @Override public void outputWithTimestamp(OutputT output, Instant timestamp) { - sideOutputWithTimestamp(mainOutputTag, output, timestamp); + throwUnsupportedOutputFromBundleMethods(); + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + throwUnsupportedOutputFromBundleMethods(); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + throwUnsupportedOutputFromBundleMethods(); + } + + private void throwUnsupportedOutputFromBundleMethods() { + throw new UnsupportedOperationException( + "DoFnTester doesn't support output from bundle methods"); } @Override @@ -613,26 +692,6 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { } return aggregator; } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - sideOutputWithTimestamp(tag, output, BoundedWindow.TIMESTAMP_MIN_VALUE); - } - - public <T> void noteOutput(TupleTag<T> tag, ValueInSingleWindow<T> output) { - getMutableOutput(tag).add(output); - } - } - - private TestProcessContext createProcessContext(TimestampedValue<InputT> elem) { - return new TestProcessContext( - ValueInSingleWindow.of( - elem.getValue(), elem.getTimestamp(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); } private class TestProcessContext extends DoFn<InputT, OutputT>.ProcessContext { @@ -641,7 +700,7 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private TestProcessContext(ValueInSingleWindow<InputT> element) { fn.super(); - this.context = createContext(fn); + this.context = new TestContext(); this.element = element; } @@ -699,8 +758,8 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { @Override public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - context.noteOutput( - tag, ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); + getMutableOutput(tag) + .add(ValueInSingleWindow.of(output, timestamp, element.getWindow(), element.getPane())); } @Override @@ -772,6 +831,29 @@ public class DoFnTester<InputT, OutputT> implements AutoCloseable { private DoFnTester(DoFn<InputT, OutputT> origFn) { this.origFn = origFn; + DoFnSignature signature = DoFnSignatures.signatureForDoFn(origFn); + for (DoFnSignature.Parameter param : signature.processElement().extraParameters()) { + param.match( + new DoFnSignature.Parameter.Cases.WithDefault<Void>() { + @Override + public Void dispatch(DoFnSignature.Parameter.ProcessContextParameter p) { + // ProcessContext parameter is obviously supported. + return null; + } + + @Override + public Void dispatch(DoFnSignature.Parameter.WindowParameter p) { + // We also support the BoundedWindow parameter. + return null; + } + + @Override + protected Void dispatchDefault(DoFnSignature.Parameter p) { + throw new UnsupportedOperationException( + "Parameter " + p + " not supported by DoFnTester"); + } + }); + } } @SuppressWarnings("unchecked") http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/78ac009b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java index ff8a9bc..b47465e 100644 --- a/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java +++ b/sdks/java/core/src/test/java/org/apache/beam/sdk/transforms/DoFnTesterTest.java @@ -30,13 +30,16 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import org.apache.beam.sdk.coders.VarIntCoder; import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.PCollectionViews; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TimestampedValue; import org.hamcrest.Matchers; +import org.joda.time.Duration; import org.joda.time.Instant; import org.junit.Rule; import org.junit.Test; @@ -350,6 +353,37 @@ public class DoFnTesterTest { } } + @Test + public void testSupportsWindowParameter() throws Exception { + Instant now = Instant.now(); + try (DoFnTester<Integer, KV<Integer, BoundedWindow>> tester = + DoFnTester.of(new DoFnWithWindowParameter())) { + BoundedWindow firstWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(1))); + tester.processWindowedElement(1, now, firstWindow); + tester.processWindowedElement(2, now, firstWindow); + BoundedWindow secondWindow = new IntervalWindow(now, now.plus(Duration.standardMinutes(4))); + tester.processWindowedElement(3, now, secondWindow); + tester.finishBundle(); + + assertThat( + tester.peekOutputElementsInWindow(firstWindow), + containsInAnyOrder( + TimestampedValue.of(KV.of(1, firstWindow), now), + TimestampedValue.of(KV.of(2, firstWindow), now))); + assertThat( + tester.peekOutputElementsInWindow(secondWindow), + containsInAnyOrder( + TimestampedValue.of(KV.of(3, secondWindow), now))); + } + } + + private static class DoFnWithWindowParameter extends DoFn<Integer, KV<Integer, BoundedWindow>> { + @ProcessElement + public void processElement(ProcessContext c, BoundedWindow window) { + c.output(KV.of(c.element(), window)); + } + } + private static class SideInputDoFn extends DoFn<Integer, Integer> { private final PCollectionView<Integer> value;