Repository: incubator-beam Updated Branches: refs/heads/master da1b7556b -> 272493ed7
Refactor CompletionCallbacks The default and timerful completion callbacks are identical, excepting their calls to evaluationContext.commitResult; factor that code into a common location. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/e7df160a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/e7df160a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/e7df160a Branch: refs/heads/master Commit: e7df160a2cde6dead6c4f7e0ec0aaa5e4808239d Parents: 9b9d73f Author: Thomas Groh <tg...@google.com> Authored: Tue May 3 13:22:13 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Tue May 3 14:27:22 2016 -0700 ---------------------------------------------------------------------- .../direct/ExecutorServiceParallelExecutor.java | 49 ++++++++++++-------- 1 file changed, 29 insertions(+), 20 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/e7df160a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java index 18af363..6f26b6b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ExecutorServiceParallelExecutor.java @@ -210,16 +210,20 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } /** - * The default {@link CompletionCallback}. The default completion callback is used to complete - * transform evaluations that are triggered due to the arrival of elements from an upstream - * transform, or for a source transform. + * The base implementation of {@link CompletionCallback} that provides implementations for + * {@link #handleResult(CommittedBundle, InProcessTransformResult)} and + * {@link #handleThrowable(CommittedBundle, Throwable)}, given an implementation of + * {@link #getCommittedResult(CommittedBundle, InProcessTransformResult)}. */ - private class DefaultCompletionCallback implements CompletionCallback { + private abstract class CompletionCallbackBase implements CompletionCallback { + protected abstract CommittedResult getCommittedResult( + CommittedBundle<?> inputBundle, + InProcessTransformResult result); + @Override - public CommittedResult handleResult( + public final CommittedResult handleResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, Collections.<TimerData>emptyList(), result); + CommittedResult committedResult = getCommittedResult(inputBundle, result); for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); } @@ -233,12 +237,27 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } /** + * The default {@link CompletionCallback}. The default completion callback is used to complete + * transform evaluations that are triggered due to the arrival of elements from an upstream + * transform, or for a source transform. + */ + private class DefaultCompletionCallback extends CompletionCallbackBase { + @Override + public CommittedResult getCommittedResult( + CommittedBundle<?> inputBundle, InProcessTransformResult result) { + return evaluationContext.handleResult(inputBundle, + Collections.<TimerData>emptyList(), + result); + } + } + + /** * A {@link CompletionCallback} where the completed bundle was produced to deliver some collection * of {@link TimerData timers}. When the evaluator completes successfully, reports all of the * timers used to create the input to the {@link InProcessEvaluationContext evaluation context} * as part of the result. */ - private class TimerCompletionCallback implements CompletionCallback { + private class TimerCompletionCallback extends CompletionCallbackBase { private final Iterable<TimerData> timers; private TimerCompletionCallback(Iterable<TimerData> timers) { @@ -246,19 +265,9 @@ final class ExecutorServiceParallelExecutor implements InProcessExecutor { } @Override - public CommittedResult handleResult( + public CommittedResult getCommittedResult( CommittedBundle<?> inputBundle, InProcessTransformResult result) { - CommittedResult committedResult = - evaluationContext.handleResult(inputBundle, timers, result); - for (CommittedBundle<?> outputBundle : committedResult.getOutputs()) { - allUpdates.offer(ExecutorUpdate.fromBundle(outputBundle)); - } - return committedResult; - } - - @Override - public void handleThrowable(CommittedBundle<?> inputBundle, Throwable t) { - allUpdates.offer(ExecutorUpdate.fromThrowable(t)); + return evaluationContext.handleResult(inputBundle, timers, result); } }