Repository: incubator-beam
Updated Branches:
  refs/heads/master da1b7556b -> 272493ed7


Refactor CompletionCallbacks

The default and timerful completion callbacks are identical, excepting
their calls to evaluationContext.commitResult; factor that code into a
common location.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7df160a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7df160a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7df160a

Branch: refs/heads/master
Commit: e7df160a2cde6dead6c4f7e0ec0aaa5e4808239d
Parents: 9b9d73f
Author: Thomas Groh <tg...@google.com>
Authored: Tue May 3 13:22:13 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Tue May 3 14:27:22 2016 -0700

----------------------------------------------------------------------
 .../direct/ExecutorServiceParallelExecutor.java | 49 ++++++++++++--------
 1 file changed, 29 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7df160a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
index 18af363..6f26b6b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java
@@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
   }
 
   /**
-   * The default {@link CompletionCallback}. The default completion callback 
is used to complete
-   * transform evaluations that are triggered due to the arrival of elements 
from an upstream
-   * transform, or for a source transform.
+   * The base implementation of {@link CompletionCallback} that provides 
implementations for
+   * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and
+   * {@link #handleThrowable(CommittedBundle, Throwable)}, given an 
implementation of
+   * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}.
    */
-  private class DefaultCompletionCallback implements CompletionCallback {
+  private abstract class CompletionCallbackBase implements CompletionCallback {
+    protected abstract CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle,
+        InProcessTransformResult result);
+
     @Override
-    public CommittedResult handleResult(
+    public final CommittedResult handleResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, 
Collections.<TimerData>emptyList(), result);
+      CommittedResult committedResult = getCommittedResult(inputBundle, 
result);
       for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
         allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
       }
@@ -233,12 +237,27 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
   }
 
   /**
+   * The default {@link CompletionCallback}. The default completion callback 
is used to complete
+   * transform evaluations that are triggered due to the arrival of elements 
from an upstream
+   * transform, or for a source transform.
+   */
+  private class DefaultCompletionCallback extends CompletionCallbackBase {
+    @Override
+    public CommittedResult getCommittedResult(
+        CommittedBundle<?> inputBundle, InProcessTransformResult result) {
+      return evaluationContext.handleResult(inputBundle,
+          Collections.<TimerData>emptyList(),
+          result);
+    }
+  }
+
+  /**
    * A {@link CompletionCallback} where the completed bundle was produced to 
deliver some collection
    * of {@link TimerData timers}. When the evaluator completes successfully, 
reports all of the
    * timers used to create the input to the {@link InProcessEvaluationContext 
evaluation context}
    * as part of the result.
    */
-  private class TimerCompletionCallback implements CompletionCallback {
+  private class TimerCompletionCallback extends CompletionCallbackBase {
     private final Iterable<TimerData> timers;
 
     private TimerCompletionCallback(Iterable<TimerData> timers) {
@@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements 
InProcessExecutor {
     }
 
     @Override
-    public CommittedResult handleResult(
+    public CommittedResult getCommittedResult(
         CommittedBundle<?> inputBundle, InProcessTransformResult result) {
-      CommittedResult committedResult =
-          evaluationContext.handleResult(inputBundle, timers, result);
-      for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) {
-        allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle));
-      }
-      return committedResult;
-    }
-
-    @Override
-    public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) {
-      allUpdates.offer(ExecutorUpdate.fromThrowable(t));
+          return evaluationContext.handleResult(inputBundle, timers, result);
     }
   }
 

Reply via email to