Repository: beam
Updated Branches:
  refs/heads/master 224e44765 -> f4e109767


[BEAM-65] ProcessFn: support setup/teardown

Previously, ProcessFn did not explicitly invoke the underlying fn's
@Setup and @Teardown methods - it was assuming that those methods would
get invoked on that fn externally. This was true in direct runner,
but is not necessarily true in other runners: e.g., Dataflow runner
will serialize the whole ProcessFn and treat it mostly as a regular
DoFn, so it makes more sense to have lifecycle methods of ProcessFn
delegate to the underlying fn.

Also, adds a getter for fn (a runner may need it to create a proper
ProcessContext when creating the SplittableProcessElementInvoker).


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b6877ce1
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b6877ce1
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b6877ce1

Branch: refs/heads/master
Commit: b6877ce1739650e7e7593c1d2c2858d60d2393fb
Parents: 4ccbdbc
Author: Eugene Kirpichov <kirpic...@google.com>
Authored: Wed Feb 1 16:07:44 2017 -0800
Committer: Eugene Kirpichov <kirpic...@google.com>
Committed: Fri Feb 3 10:41:47 2017 -0800

----------------------------------------------------------------------
 .../beam/runners/core/SplittableParDo.java      | 16 ++++-
 .../beam/runners/core/SplittableParDoTest.java  | 73 +++++++++++++++++++-
 .../runners/direct/ParDoEvaluatorFactory.java   |  5 +-
 ...littableProcessElementsEvaluatorFactory.java | 11 ++-
 4 files changed, 96 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
index 78acb19..664f334 100644
--- 
a/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
+++ 
b/runners/core-java/src/main/java/org/apache/beam/runners/core/SplittableParDo.java
@@ -375,7 +375,6 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
         Coder<RestrictionT> restrictionCoder,
         Coder<? extends BoundedWindow> windowCoder) {
       this.fn = fn;
-      this.invoker = DoFnInvokers.invokerFor(fn);
       this.windowCoder = windowCoder;
       this.elementTag =
           StateTags.value("element", WindowedValue.getFullCoder(elementCoder, 
this.windowCoder));
@@ -395,6 +394,21 @@ public class SplittableParDo<InputT, OutputT, RestrictionT>
       this.processElementInvoker = invoker;
     }
 
+    public DoFn<InputT, OutputT> getFn() {
+      return fn;
+    }
+
+    @Setup
+    public void setup() throws Exception {
+      invoker = DoFnInvokers.invokerFor(fn);
+      invoker.invokeSetup();
+    }
+
+    @Teardown
+    public void tearDown() throws Exception {
+      invoker.invokeTeardown();
+    }
+
     @StartBundle
     public void startBundle(Context c) throws Exception {
       invoker.invokeStartBundle(wrapContext(c));

http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
----------------------------------------------------------------------
diff --git 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
index bb7fd8c..96d65ae 100644
--- 
a/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
+++ 
b/runners/core-java/src/test/java/org/apache/beam/runners/core/SplittableParDoTest.java
@@ -200,7 +200,8 @@ public class SplittableParDoTest {
    * {@link DoFn.ProcessElement} calls).
    */
   private static class ProcessFnTester<
-      InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>> {
+          InputT, OutputT, RestrictionT, TrackerT extends 
RestrictionTracker<RestrictionT>>
+      implements AutoCloseable {
     private final DoFnTester<
             KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>
         tester;
@@ -270,6 +271,11 @@ public class SplittableParDoTest {
       this.currentProcessingTime = currentProcessingTime;
     }
 
+    @Override
+    public void close() throws Exception {
+      tester.close();
+    }
+
     /** Performs a seed {@link DoFn.ProcessElement} call feeding the element 
and restriction. */
     void startElement(InputT element, RestrictionT restriction) throws 
Exception {
       startElement(
@@ -633,4 +639,69 @@ public class SplittableParDoTest {
         Instant.now().getMillis() - base.getMillis(),
         greaterThanOrEqualTo(maxBundleDuration.getMillis()));
   }
+
+  private static class LifecycleVerifyingFn extends DoFn<Integer, String> {
+    private enum State {
+      BEFORE_SETUP,
+      OUTSIDE_BUNDLE,
+      INSIDE_BUNDLE,
+      TORN_DOWN
+    }
+
+    private State state = State.BEFORE_SETUP;
+
+    @ProcessElement
+    public void process(ProcessContext c, SomeRestrictionTracker tracker) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+    }
+
+    @GetInitialRestriction
+    public SomeRestriction getInitialRestriction(Integer element) {
+      return new SomeRestriction();
+    }
+
+    @NewTracker
+    public SomeRestrictionTracker newTracker(SomeRestriction restriction) {
+      return new SomeRestrictionTracker();
+    }
+
+    @Setup
+    public void setup() {
+      assertEquals(State.BEFORE_SETUP, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+
+    @Teardown
+    public void tearDown() {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.TORN_DOWN;
+    }
+
+    @StartBundle
+    public void startBundle(Context c) {
+      assertEquals(State.OUTSIDE_BUNDLE, state);
+      state = State.INSIDE_BUNDLE;
+    }
+
+    @FinishBundle
+    public void finishBundle(Context c) {
+      assertEquals(State.INSIDE_BUNDLE, state);
+      state = State.OUTSIDE_BUNDLE;
+    }
+  }
+
+  @Test
+  public void testInvokesLifecycleMethods() throws Exception {
+    DoFn<Integer, String> fn = new LifecycleVerifyingFn();
+    try (ProcessFnTester<Integer, String, SomeRestriction, 
SomeRestrictionTracker> tester =
+        new ProcessFnTester<>(
+            Instant.now(),
+            fn,
+            BigEndianIntegerCoder.of(),
+            SerializableCoder.of(SomeRestriction.class),
+            MAX_OUTPUTS_PER_BUNDLE,
+            MAX_BUNDLE_DURATION)) {
+      tester.startElement(42, new SomeRestriction());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
index b028766..7d6a8ea 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java
@@ -111,7 +111,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> 
implements TransformEvaluator
             .getExecutionContext(application, inputBundleKey)
             .getOrCreateStepContext(stepName, stepName);
 
-    DoFnLifecycleManager fnManager = getManagerForCloneOf(doFn);
+    DoFnLifecycleManager fnManager = fnClones.getUnchecked(doFn);
 
     return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(
         createParDoEvaluator(
@@ -170,7 +170,4 @@ final class ParDoEvaluatorFactory<InputT, OutputT> 
implements TransformEvaluator
     return pcs;
   }
 
-  public DoFnLifecycleManager getManagerForCloneOf(DoFn<?, ?> fn) {
-    return fnClones.getUnchecked(fn);
-  }
 }

http://git-wip-us.apache.org/repos/asf/beam/blob/b6877ce1/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
index 64593cd..c57ece1 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java
@@ -82,10 +82,15 @@ class SplittableProcessElementsEvaluatorFactory<
     final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, 
TrackerT> transform =
         application.getTransform();
 
-    DoFnLifecycleManager fnManager = 
delegateFactory.getManagerForCloneOf(transform.getFn());
-
     SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> 
processFn =
-        transform.newProcessFn(fnManager.<InputT, OutputT>get());
+        transform.newProcessFn(transform.getFn());
+
+    DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn);
+    processFn =
+        ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>)
+            fnManager
+                .<KeyedWorkItem<String, ElementAndRestriction<InputT, 
RestrictionT>>, OutputT>
+                    get());
 
     String stepName = evaluationContext.getStepName(application);
     final DirectExecutionContext.DirectStepContext stepContext =

Reply via email to