Move Shared construction code to ParDoInProcessEvaluator Remove duplicate code in ParDo(Single/Multi)EvaluatorFactory; instead only extract the appropriate elements and pass them to the ParDoInProcessEvaluator.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/64144259 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/64144259 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/64144259 Branch: refs/heads/master Commit: 64144259551f1cf627545e0329c5a0daf087e7d2 Parents: 7a5b7ad Author: Thomas Groh <tg...@google.com> Authored: Tue Mar 29 17:38:22 2016 -0700 Committer: Dan Halperin <dhalp...@google.com> Committed: Fri Apr 15 16:11:32 2016 -0700 ---------------------------------------------------------------------- .../inprocess/ParDoInProcessEvaluator.java | 50 +++++++++++++++++- .../inprocess/ParDoMultiEvaluatorFactory.java | 55 +++++--------------- .../inprocess/ParDoSingleEvaluatorFactory.java | 52 +++++------------- 3 files changed, 75 insertions(+), 82 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java index a68fa53..7365527 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoInProcessEvaluator.java @@ -18,15 +18,19 @@ package org.apache.beam.sdk.runners.inprocess; import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; +import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.util.DoFnRunner; +import org.apache.beam.sdk.util.DoFnRunners; import org.apache.beam.sdk.util.DoFnRunners.OutputManager; import org.apache.beam.sdk.util.UserCodeException; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import java.util.ArrayList; @@ -36,13 +40,57 @@ import java.util.List; import java.util.Map; class ParDoInProcessEvaluator<T> implements TransformEvaluator<T> { + public static <InputT, OutputT> ParDoInProcessEvaluator<InputT> create( + InProcessEvaluationContext evaluationContext, + CommittedBundle<InputT> inputBundle, + AppliedPTransform<PCollection<InputT>, ?, ?> application, + DoFn<InputT, OutputT> fn, + List<PCollectionView<?>> sideInputs, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> sideOutputTags, + Map<TupleTag<?>, PCollection<?>> outputs) { + InProcessExecutionContext executionContext = + evaluationContext.getExecutionContext(application, inputBundle.getKey()); + String stepName = evaluationContext.getStepName(application); + InProcessStepContext stepContext = + executionContext.getOrCreateStepContext(stepName, stepName, null); + + CounterSet counters = evaluationContext.createCounterSet(); + + Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); + for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { + outputBundles.put( + outputEntry.getKey(), + evaluationContext.createBundle(inputBundle, outputEntry.getValue())); + } + + DoFnRunner<InputT, OutputT> runner = + DoFnRunners.createDefault( + evaluationContext.getPipelineOptions(), + fn, + evaluationContext.createSideInputReader(sideInputs), + BundleOutputManager.create(outputBundles), + mainOutputTag, + sideOutputTags, + stepContext, + counters.getAddCounterMutator(), + application.getInput().getWindowingStrategy()); + + runner.startBundle(); + + return new ParDoInProcessEvaluator<>( + runner, application, counters, outputBundles.values(), stepContext); + } + + //////////////////////////////////////////////////////////////////////////////////////////////// + private final DoFnRunner<T, ?> fnRunner; private final AppliedPTransform<PCollection<T>, ?, ?> transform; private final CounterSet counters; private final Collection<UncommittedBundle<?>> outputBundles; private final InProcessStepContext stepContext; - public ParDoInProcessEvaluator( + private ParDoInProcessEvaluator( DoFnRunner<T, ?> fnRunner, AppliedPTransform<PCollection<T>, ?, ?> transform, CounterSet counters, http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java index 2b95574..299d3a8 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoMultiEvaluatorFactory.java @@ -16,23 +16,15 @@ * limitations under the License. */ package org.apache.beam.sdk.runners.inprocess; - -import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.BoundMulti; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; import org.apache.beam.sdk.values.TupleTag; -import java.util.HashMap; import java.util.Map; /** @@ -45,9 +37,9 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createMultiEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + createMultiEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); return evaluator; } @@ -55,38 +47,17 @@ class ParDoMultiEvaluatorFactory implements TransformEvaluatorFactory { AppliedPTransform<PCollection<InT>, PCollectionTuple, BoundMulti<InT, OuT>> application, CommittedBundle<InT> inputBundle, InProcessEvaluationContext evaluationContext) { - PCollectionTuple output = application.getOutput(); - Map<TupleTag<?>, PCollection<?>> outputs = output.getAll(); - Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); - for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { - outputBundles.put( - outputEntry.getKey(), - evaluationContext.createBundle(inputBundle, outputEntry.getValue())); - } - InProcessExecutionContext executionContext = - evaluationContext.getExecutionContext(application, inputBundle.getKey()); - String stepName = evaluationContext.getStepName(application); - InProcessStepContext stepContext = - executionContext.getOrCreateStepContext(stepName, stepName, null); - - CounterSet counters = evaluationContext.createCounterSet(); - + Map<TupleTag<?>, PCollection<?>> outputs = application.getOutput().getAll(); DoFn<InT, OuT> fn = application.getTransform().getFn(); - DoFnRunner<InT, OuT> runner = - DoFnRunners.createDefault( - evaluationContext.getPipelineOptions(), - fn, - evaluationContext.createSideInputReader(application.getTransform().getSideInputs()), - BundleOutputManager.create(outputBundles), - application.getTransform().getMainOutputTag(), - application.getTransform().getSideOutputTags().getAll(), - stepContext, - counters.getAddCounterMutator(), - application.getInput().getWindowingStrategy()); - - runner.startBundle(); - return new ParDoInProcessEvaluator<>( - runner, application, counters, outputBundles.values(), stepContext); + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, + application, + fn, + application.getTransform().getSideInputs(), + application.getTransform().getMainOutputTag(), + application.getTransform().getSideOutputTags().getAll(), + outputs); } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/64144259/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java index 044b7e0..4d38448 100644 --- a/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java +++ b/sdks/java/core/src/main/java/org/apache/beam/sdk/runners/inprocess/ParDoSingleEvaluatorFactory.java @@ -17,19 +17,15 @@ */ package org.apache.beam.sdk.runners.inprocess; -import org.apache.beam.sdk.runners.inprocess.InProcessExecutionContext.InProcessStepContext; import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.CommittedBundle; -import org.apache.beam.sdk.runners.inprocess.InProcessPipelineRunner.UncommittedBundle; -import org.apache.beam.sdk.runners.inprocess.ParDoInProcessEvaluator.BundleOutputManager; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo.Bound; -import org.apache.beam.sdk.util.DoFnRunner; -import org.apache.beam.sdk.util.DoFnRunners; -import org.apache.beam.sdk.util.common.CounterSet; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.TupleTag; +import com.google.common.collect.ImmutableMap; + import java.util.Collections; /** @@ -42,9 +38,9 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { final AppliedPTransform<?, ?, ?> application, CommittedBundle<?> inputBundle, InProcessEvaluationContext evaluationContext) { - @SuppressWarnings({"cast", "unchecked", "rawtypes"}) - TransformEvaluator<T> evaluator = (TransformEvaluator<T>) createSingleEvaluator( - (AppliedPTransform) application, inputBundle, evaluationContext); + @SuppressWarnings({"unchecked", "rawtypes"}) + TransformEvaluator<T> evaluator = + createSingleEvaluator((AppliedPTransform) application, inputBundle, evaluationContext); return evaluator; } @@ -53,37 +49,15 @@ class ParDoSingleEvaluatorFactory implements TransformEvaluatorFactory { Bound<InputT, OutputT>> application, CommittedBundle<InputT> inputBundle, InProcessEvaluationContext evaluationContext) { TupleTag<OutputT> mainOutputTag = new TupleTag<>("out"); - UncommittedBundle<OutputT> outputBundle = - evaluationContext.createBundle(inputBundle, application.getOutput()); - - InProcessExecutionContext executionContext = - evaluationContext.getExecutionContext(application, inputBundle.getKey()); - String stepName = evaluationContext.getStepName(application); - InProcessStepContext stepContext = - executionContext.getOrCreateStepContext(stepName, stepName, null); - - CounterSet counters = evaluationContext.createCounterSet(); - - DoFnRunner<InputT, OutputT> runner = - DoFnRunners.createDefault( - evaluationContext.getPipelineOptions(), - application.getTransform().getFn(), - evaluationContext.createSideInputReader(application.getTransform().getSideInputs()), - BundleOutputManager.create( - Collections.<TupleTag<?>, UncommittedBundle<?>>singletonMap( - mainOutputTag, outputBundle)), - mainOutputTag, - Collections.<TupleTag<?>>emptyList(), - stepContext, - counters.getAddCounterMutator(), - application.getInput().getWindowingStrategy()); - runner.startBundle(); - return new ParDoInProcessEvaluator<InputT>( - runner, + return ParDoInProcessEvaluator.create( + evaluationContext, + inputBundle, application, - counters, - Collections.<UncommittedBundle<?>>singleton(outputBundle), - stepContext); + application.getTransform().getFn(), + application.getTransform().getSideInputs(), + mainOutputTag, + Collections.<TupleTag<?>>emptyList(), + ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, application.getOutput())); } }