Repository: beam Updated Branches: refs/heads/master 224e44765 -> f4e109767
[BEAM-65] ProcessFn: support setup/teardown Previously, ProcessFn did not explicitly invoke the underlying fn's @Setup and @Teardown methods - it was assuming that those methods would get invoked on that fn externally. This was true in direct runner, but is not necessarily true in other runners: e.g., Dataflow runner will serialize the whole ProcessFn and treat it mostly as a regular DoFn, so it makes more sense to have lifecycle methods of ProcessFn delegate to the underlying fn. Also, adds a getter for fn (a runner may need it to create a proper ProcessContext when creating the SplittableProcessElementInvoker). Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6877ce1 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6877ce1 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6877ce1 Branch: refs/heads/master Commit: b6877ce1739650e7e7593c1d2c2858d60d2393fb Parents: 4ccbdbc Author: Eugene Kirpichov <kirpic...@google.com> Authored: Wed Feb 1 16:07:44 2017 -0800 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Fri Feb 3 10:41:47 2017 -0800 ---------------------------------------------------------------------- .../beam/runners/core/SplittableParDo.java | 16 ++++- .../beam/runners/core/SplittableParDoTest.java | 73 +++++++++++++++++++- .../runners/direct/ParDoEvaluatorFactory.java | 5 +- ...littableProcessElementsEvaluatorFactory.java | 11 ++- 4 files changed, 96 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java index 78acb19..664f334 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java @@ -375,7 +375,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> Coder<RestrictionT> restrictionCoder, Coder<? extends BoundedWindow> windowCoder) { this.fn = fn; - this.invoker = DoFnInvokers.invokerFor(fn); this.windowCoder = windowCoder; this.elementTag = StateTags.value("element", WindowedValue.getFullCoder(elementCoder, this.windowCoder)); @@ -395,6 +394,21 @@ public class SplittableParDo<InputT, OutputT, RestrictionT> this.processElementInvoker = invoker; } + public DoFn<InputT, OutputT> getFn() { + return fn; + } + + @Setup + public void setup() throws Exception { + invoker = DoFnInvokers.invokerFor(fn); + invoker.invokeSetup(); + } + + @Teardown + public void tearDown() throws Exception { + invoker.invokeTeardown(); + } + @StartBundle public void startBundle(Context c) throws Exception { invoker.invokeStartBundle(wrapContext(c)); http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java index bb7fd8c..96d65ae 100644 --- a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java +++ b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java @@ -200,7 +200,8 @@ public class SplittableParDoTest { * {@link DoFn.ProcessElement} calls). */ private static class ProcessFnTester< - InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> { + InputT, OutputT, RestrictionT, TrackerT extends RestrictionTracker<RestrictionT>> + implements AutoCloseable { private final DoFnTester< KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> tester; @@ -270,6 +271,11 @@ public class SplittableParDoTest { this.currentProcessingTime = currentProcessingTime; } + @Override + public void close() throws Exception { + tester.close(); + } + /** Performs a seed {@link DoFn.ProcessElement} call feeding the element and restriction. */ void startElement(InputT element, RestrictionT restriction) throws Exception { startElement( @@ -633,4 +639,69 @@ public class SplittableParDoTest { Instant.now().getMillis() - base.getMillis(), greaterThanOrEqualTo(maxBundleDuration.getMillis())); } + + private static class LifecycleVerifyingFn extends DoFn<Integer, String> { + private enum State { + BEFORE_SETUP, + OUTSIDE_BUNDLE, + INSIDE_BUNDLE, + TORN_DOWN + } + + private State state = State.BEFORE_SETUP; + + @ProcessElement + public void process(ProcessContext c, SomeRestrictionTracker tracker) { + assertEquals(State.INSIDE_BUNDLE, state); + } + + @GetInitialRestriction + public SomeRestriction getInitialRestriction(Integer element) { + return new SomeRestriction(); + } + + @NewTracker + public SomeRestrictionTracker newTracker(SomeRestriction restriction) { + return new SomeRestrictionTracker(); + } + + @Setup + public void setup() { + assertEquals(State.BEFORE_SETUP, state); + state = State.OUTSIDE_BUNDLE; + } + + @Teardown + public void tearDown() { + assertEquals(State.OUTSIDE_BUNDLE, state); + state = State.TORN_DOWN; + } + + @StartBundle + public void startBundle(Context c) { + assertEquals(State.OUTSIDE_BUNDLE, state); + state = State.INSIDE_BUNDLE; + } + + @FinishBundle + public void finishBundle(Context c) { + assertEquals(State.INSIDE_BUNDLE, state); + state = State.OUTSIDE_BUNDLE; + } + } + + @Test + public void testInvokesLifecycleMethods() throws Exception { + DoFn<Integer, String> fn = new LifecycleVerifyingFn(); + try (ProcessFnTester<Integer, String, SomeRestriction, SomeRestrictionTracker> tester = + new ProcessFnTester<>( + Instant.now(), + fn, + BigEndianIntegerCoder.of(), + SerializableCoder.of(SomeRestriction.class), + MAX_OUTPUTS_PER_BUNDLE, + MAX_BUNDLE_DURATION)) { + tester.startElement(42, new SomeRestriction()); + } + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index b028766..7d6a8ea 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -111,7 +111,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator .getExecutionContext(application, inputBundleKey) .getOrCreateStepContext(stepName, stepName); - DoFnLifecycleManager fnManager = getManagerForCloneOf(doFn); + DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn); return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping( createParDoEvaluator( @@ -170,7 +170,4 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator return pcs; } - public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) { - return fnClones.getUnchecked(fn); - } } http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 64593cd..c57ece1 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -82,10 +82,15 @@ class SplittableProcessElementsEvaluatorFactory< final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = application.getTransform(); - DoFnLifecycleManager fnManager = delegateFactory.getManagerForCloneOf(transform.getFn()); - SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = - transform.newProcessFn(fnManager.<InputT, OutputT>get()); + transform.newProcessFn(transform.getFn()); + + DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn); + processFn = + ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) + fnManager + .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + get()); String stepName = evaluationContext.getStepName(application); final DirectExecutionContext.DirectStepContext stepContext =