Creates ProcessFnRunner and wires it through ParDoEvaluator
Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b93de58f Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b93de58f Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b93de58f Branch: refs/heads/DSL_SQL Commit: b93de58f5a3a10877997815a793725cb0e53cc2d Parents: 7e1a267 Author: Eugene Kirpichov <kirpic...@google.com> Authored: Mon Apr 17 14:52:23 2017 -0700 Committer: Eugene Kirpichov <kirpic...@google.com> Committed: Tue Apr 18 18:02:07 2017 -0700 ---------------------------------------------------------------------- .../apache/beam/runners/core/DoFnRunners.java | 32 +++++ .../beam/runners/core/ProcessFnRunner.java | 127 +++++++++++++++++++ .../beam/runners/direct/ParDoEvaluator.java | 114 +++++++++++++---- .../runners/direct/ParDoEvaluatorFactory.java | 11 +- ...littableProcessElementsEvaluatorFactory.java | 106 ++++++++++++---- .../direct/StatefulParDoEvaluatorFactory.java | 4 +- .../direct/TransformEvaluatorRegistry.java | 4 +- .../beam/runners/direct/ParDoEvaluatorTest.java | 3 +- 8 files changed, 341 insertions(+), 60 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java index b09ee08..8501e72 100644 --- a/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/DoFnRunners.java @@ -17,8 +17,10 @@ */ package org.apache.beam.runners.core; +import java.util.Collection; import java.util.List; import org.apache.beam.runners.core.ExecutionContext.StepContext; +import org.apache.beam.runners.core.SplittableParDo.ProcessFn; import org.apache.beam.runners.core.StatefulDoFnRunner.CleanupTimer; import org.apache.beam.runners.core.StatefulDoFnRunner.StateCleaner; import org.apache.beam.sdk.options.PipelineOptions; @@ -26,10 +28,12 @@ import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.SideInputReader; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; /** @@ -146,4 +150,32 @@ public class DoFnRunners { stateCleaner, droppedDueToLateness); } + + public static <InputT, OutputT, RestrictionT> + ProcessFnRunner<InputT, OutputT, RestrictionT> + newProcessFnRunner( + ProcessFn<InputT, OutputT, RestrictionT, ?> fn, + PipelineOptions options, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + StepContext stepContext, + AggregatorFactory aggregatorFactory, + WindowingStrategy<?, ?> windowingStrategy) { + return new ProcessFnRunner<>( + simpleRunner( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + aggregatorFactory, + windowingStrategy), + views, + sideInputReader); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java ---------------------------------------------------------------------- diff --git a/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java new file mode 100644 index 0000000..3ae3f50 --- /dev/null +++ b/runners/core-java/src/main/java/org/apache/beam/runners/core/ProcessFnRunner.java @@ -0,0 +1,127 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.core; + +import static com.google.common.base.Preconditions.checkArgument; +import static org.apache.beam.runners.core.SplittableParDo.ProcessFn; + +import com.google.common.collect.Iterables; +import java.util.Collection; +import java.util.Collections; +import org.apache.beam.runners.core.StateNamespaces.WindowNamespace; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; +import org.apache.beam.sdk.util.TimeDomain; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.values.PCollectionView; +import org.joda.time.Instant; + +/** Runs a {@link ProcessFn} by constructing the appropriate contexts and passing them in. */ +public class ProcessFnRunner<InputT, OutputT, RestrictionT> + implements PushbackSideInputDoFnRunner< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> { + private final DoFnRunner< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + underlying; + private final Collection<PCollectionView<?>> views; + private final ReadyCheckingSideInputReader sideInputReader; + + ProcessFnRunner( + DoFnRunner<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + underlying, + Collection<PCollectionView<?>> views, + ReadyCheckingSideInputReader sideInputReader) { + this.underlying = underlying; + this.views = views; + this.sideInputReader = sideInputReader; + } + + @Override + public void startBundle() { + underlying.startBundle(); + } + + @Override + public Iterable<WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>>> + processElementInReadyWindows( + WindowedValue<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + windowedKWI) { + checkTrivialOuterWindows(windowedKWI); + BoundedWindow window = getUnderlyingWindow(windowedKWI.getValue()); + if (!isReady(window)) { + return Collections.singletonList(windowedKWI); + } + underlying.processElement(windowedKWI); + return Collections.emptyList(); + } + + @Override + public void finishBundle() { + underlying.finishBundle(); + } + + @Override + public void onTimer( + String timerId, BoundedWindow window, Instant timestamp, TimeDomain timeDomain) { + throw new UnsupportedOperationException("User timers unsupported in ProcessFn"); + } + + private static <T> void checkTrivialOuterWindows( + WindowedValue<KeyedWorkItem<String, T>> windowedKWI) { + // In practice it will be in 0 or 1 windows (ValueInEmptyWindows or ValueInGlobalWindow) + Collection<? extends BoundedWindow> outerWindows = windowedKWI.getWindows(); + if (!outerWindows.isEmpty()) { + checkArgument( + outerWindows.size() == 1, + "The KeyedWorkItem itself must not be in multiple windows, but was in: %s", + outerWindows); + BoundedWindow onlyWindow = Iterables.getOnlyElement(outerWindows); + checkArgument( + onlyWindow instanceof GlobalWindow, + "KeyedWorkItem must be in the Global window, but was in: %s", + onlyWindow); + } + } + + private static <T> BoundedWindow getUnderlyingWindow(KeyedWorkItem<String, T> kwi) { + if (Iterables.isEmpty(kwi.elementsIterable())) { + // ProcessFn sets only a single timer. + TimerData timer = Iterables.getOnlyElement(kwi.timersIterable()); + return ((WindowNamespace) timer.getNamespace()).getWindow(); + } else { + // KWI must have a single element in elementsIterable, because it follows a GBK by a + // uniquely generated key. + // Additionally, windows must be exploded before GBKIntoKeyedWorkItems, so there's also + // only a single window. + WindowedValue<T> value = Iterables.getOnlyElement(kwi.elementsIterable()); + return Iterables.getOnlyElement(value.getWindows()); + } + } + + private boolean isReady(BoundedWindow mainInputWindow) { + for (PCollectionView<?> view : views) { + BoundedWindow sideInputWindow = view.getWindowMappingFn().getSideInputWindow(mainInputWindow); + if (!sideInputReader.isReady(view, sideInputWindow)) { + return false; + } + } + return true; + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java index bab7b2c..cab11db 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluator.java @@ -30,6 +30,7 @@ import org.apache.beam.runners.core.SimplePushbackSideInputDoFnRunner; import org.apache.beam.runners.core.TimerInternals.TimerData; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; @@ -43,6 +44,50 @@ import org.apache.beam.sdk.values.TupleTag; class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { + public interface DoFnRunnerFactory<InputT, OutputT> { + PushbackSideInputDoFnRunner<InputT, OutputT> createRunner( + PipelineOptions options, + DoFn<InputT, OutputT> fn, + List<PCollectionView<?>> sideInputs, + ReadyCheckingSideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + DirectStepContext stepContext, + AggregatorContainer.Mutator aggregatorChanges, + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy); + } + + public static <InputT, OutputT> DoFnRunnerFactory<InputT, OutputT> defaultRunnerFactory() { + return new DoFnRunnerFactory<InputT, OutputT>() { + @Override + public PushbackSideInputDoFnRunner<InputT, OutputT> createRunner( + PipelineOptions options, + DoFn<InputT, OutputT> fn, + List<PCollectionView<?>> sideInputs, + ReadyCheckingSideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + DirectStepContext stepContext, + AggregatorContainer.Mutator aggregatorChanges, + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) { + DoFnRunner<InputT, OutputT> underlying = + DoFnRunners.simpleRunner( + options, + fn, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + aggregatorChanges, + windowingStrategy); + return SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); + } + }; + } + public static <InputT, OutputT> ParDoEvaluator<InputT> create( EvaluationContext evaluationContext, DirectStepContext stepContext, @@ -53,9 +98,43 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { List<PCollectionView<?>> sideInputs, TupleTag<OutputT> mainOutputTag, List<TupleTag<?>> additionalOutputTags, - Map<TupleTag<?>, PCollection<?>> outputs) { + Map<TupleTag<?>, PCollection<?>> outputs, + DoFnRunnerFactory<InputT, OutputT> runnerFactory) { AggregatorContainer.Mutator aggregatorChanges = evaluationContext.getAggregatorMutator(); + BundleOutputManager outputManager = createOutputManager(evaluationContext, key, outputs); + + ReadyCheckingSideInputReader sideInputReader = + evaluationContext.createSideInputReader(sideInputs); + + PushbackSideInputDoFnRunner<InputT, OutputT> runner = runnerFactory.createRunner( + evaluationContext.getPipelineOptions(), + fn, + sideInputs, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + aggregatorChanges, + windowingStrategy); + + return create(runner, stepContext, application, aggregatorChanges, outputManager); + } + + public static <InputT, OutputT> ParDoEvaluator<InputT> create( + PushbackSideInputDoFnRunner<InputT, OutputT> runner, + DirectStepContext stepContext, + AppliedPTransform<?, ?, ?> application, + AggregatorContainer.Mutator aggregatorChanges, + BundleOutputManager outputManager) { + return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext); + } + + static BundleOutputManager createOutputManager( + EvaluationContext evaluationContext, + StructuralKey<?> key, + Map<TupleTag<?>, PCollection<?>> outputs) { Map<TupleTag<?>, UncommittedBundle<?>> outputBundles = new HashMap<>(); for (Map.Entry<TupleTag<?>, PCollection<?>> outputEntry : outputs.entrySet()) { // Just trust the context's decision as to whether the output should be keyed. @@ -69,32 +148,7 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { outputEntry.getKey(), evaluationContext.createBundle(outputEntry.getValue())); } } - BundleOutputManager outputManager = BundleOutputManager.create(outputBundles); - - ReadyCheckingSideInputReader sideInputReader = - evaluationContext.createSideInputReader(sideInputs); - - DoFnRunner<InputT, OutputT> underlying = - DoFnRunners.simpleRunner( - evaluationContext.getPipelineOptions(), - fn, - sideInputReader, - outputManager, - mainOutputTag, - additionalOutputTags, - stepContext, - aggregatorChanges, - windowingStrategy); - PushbackSideInputDoFnRunner<InputT, OutputT> runner = - SimplePushbackSideInputDoFnRunner.create(underlying, sideInputs, sideInputReader); - - try { - runner.startBundle(); - } catch (Exception e) { - throw UserCodeException.wrap(e); - } - - return new ParDoEvaluator<>(runner, application, aggregatorChanges, outputManager, stepContext); + return BundleOutputManager.create(outputBundles); } //////////////////////////////////////////////////////////////////////////////////////////////// @@ -119,6 +173,12 @@ class ParDoEvaluator<InputT> implements TransformEvaluator<InputT> { this.stepContext = stepContext; this.aggregatorChanges = aggregatorChanges; this.unprocessedElements = ImmutableList.builder(); + + try { + fnRunner.startBundle(); + } catch (Exception e) { + throw UserCodeException.wrap(e); + } } public BundleOutputManager getOutputManager() { http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java index 93f204a..b00c2b6 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/ParDoEvaluatorFactory.java @@ -43,9 +43,13 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator private static final Logger LOG = LoggerFactory.getLogger(ParDoEvaluatorFactory.class); private final LoadingCache<DoFn<?, ?>, DoFnLifecycleManager> fnClones; private final EvaluationContext evaluationContext; + private final ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory; - ParDoEvaluatorFactory(EvaluationContext evaluationContext) { + ParDoEvaluatorFactory( + EvaluationContext evaluationContext, + ParDoEvaluator.DoFnRunnerFactory<InputT, OutputT> runnerFactory) { this.evaluationContext = evaluationContext; + this.runnerFactory = runnerFactory; fnClones = CacheBuilder.newBuilder() .build( @@ -148,7 +152,8 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator sideInputs, mainOutputTag, additionalOutputTags, - pcollections(application.getOutputs())); + pcollections(application.getOutputs()), + runnerFactory); } catch (Exception e) { try { fnManager.remove(); @@ -162,7 +167,7 @@ final class ParDoEvaluatorFactory<InputT, OutputT> implements TransformEvaluator } } - private Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) { + static Map<TupleTag<?>, PCollection<?>> pcollections(Map<TupleTag<?>, PValue> outputs) { Map<TupleTag<?>, PCollection<?>> pcs = new HashMap<>(); for (Map.Entry<TupleTag<?>, PValue> output : outputs.entrySet()) { pcs.put(output.getKey(), (PCollection<?>) output.getValue()); http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java index 00b16dd..7efdb52 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/SplittableProcessElementsEvaluatorFactory.java @@ -18,25 +18,34 @@ package org.apache.beam.runners.direct; import java.util.Collection; +import java.util.List; import java.util.concurrent.Executors; +import org.apache.beam.runners.core.DoFnRunners; import org.apache.beam.runners.core.DoFnRunners.OutputManager; import org.apache.beam.runners.core.ElementAndRestriction; import org.apache.beam.runners.core.KeyedWorkItem; import org.apache.beam.runners.core.OutputAndTimeBoundedSplittableProcessElementInvoker; import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.PushbackSideInputDoFnRunner; import org.apache.beam.runners.core.SplittableParDo; +import org.apache.beam.runners.core.SplittableParDo.ProcessFn; import org.apache.beam.runners.core.StateInternals; import org.apache.beam.runners.core.StateInternalsFactory; import org.apache.beam.runners.core.TimerInternals; import org.apache.beam.runners.core.TimerInternalsFactory; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.AppliedPTransform; +import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.ReadyCheckingSideInputReader; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; import org.joda.time.Duration; import org.joda.time.Instant; @@ -51,7 +60,11 @@ class SplittableProcessElementsEvaluatorFactory< SplittableProcessElementsEvaluatorFactory(EvaluationContext evaluationContext) { this.evaluationContext = evaluationContext; - this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext); + this.delegateFactory = + new ParDoEvaluatorFactory<>( + evaluationContext, + SplittableProcessElementsEvaluatorFactory + .<InputT, OutputT, RestrictionT>processFnRunnerFactory()); } @Override @@ -82,12 +95,12 @@ class SplittableProcessElementsEvaluatorFactory< final SplittableParDo.ProcessElements<InputT, OutputT, RestrictionT, TrackerT> transform = application.getTransform(); - SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = + ProcessFn<InputT, OutputT, RestrictionT, TrackerT> processFn = transform.newProcessFn(transform.getFn()); DoFnLifecycleManager fnManager = DoFnLifecycleManager.of(processFn); processFn = - ((SplittableParDo.ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) + ((ProcessFn<InputT, OutputT, RestrictionT, TrackerT>) fnManager .<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> get()); @@ -98,7 +111,7 @@ class SplittableProcessElementsEvaluatorFactory< .getExecutionContext(application, inputBundle.getKey()) .getOrCreateStepContext(stepName, stepName); - ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> + final ParDoEvaluator<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>> parDoEvaluator = delegateFactory.createParDoEvaluator( application, @@ -127,34 +140,36 @@ class SplittableProcessElementsEvaluatorFactory< } }); - final OutputManager outputManager = parDoEvaluator.getOutputManager(); + OutputWindowedValue<OutputT> outputWindowedValue = + new OutputWindowedValue<OutputT>() { + private final OutputManager outputManager = parDoEvaluator.getOutputManager(); + + @Override + public void outputWindowedValue( + OutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output( + transform.getMainOutputTag(), WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public <AdditionalOutputT> void outputWindowedValue( + TupleTag<AdditionalOutputT> tag, + AdditionalOutputT output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); + } + }; processFn.setProcessElementInvoker( new OutputAndTimeBoundedSplittableProcessElementInvoker< InputT, OutputT, RestrictionT, TrackerT>( transform.getFn(), evaluationContext.getPipelineOptions(), - new OutputWindowedValue<OutputT>() { - @Override - public void outputWindowedValue( - OutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputManager.output( - transform.getMainOutputTag(), - WindowedValue.of(output, timestamp, windows, pane)); - } - - @Override - public <AdditionalOutputT> void outputWindowedValue( - TupleTag<AdditionalOutputT> tag, - AdditionalOutputT output, - Instant timestamp, - Collection<? extends BoundedWindow> windows, - PaneInfo pane) { - outputManager.output(tag, WindowedValue.of(output, timestamp, windows, pane)); - } - }, + outputWindowedValue, evaluationContext.createSideInputReader(transform.getSideInputs()), // TODO: For better performance, use a higher-level executor? Executors.newSingleThreadScheduledExecutor(Executors.defaultThreadFactory()), @@ -163,4 +178,41 @@ class SplittableProcessElementsEvaluatorFactory< return DoFnLifecycleManagerRemovingTransformEvaluator.wrapping(parDoEvaluator, fnManager); } + + private static <InputT, OutputT, RestrictionT> + ParDoEvaluator.DoFnRunnerFactory< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + processFnRunnerFactory() { + return new ParDoEvaluator.DoFnRunnerFactory< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT>() { + @Override + public PushbackSideInputDoFnRunner< + KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> + createRunner( + PipelineOptions options, + DoFn<KeyedWorkItem<String, ElementAndRestriction<InputT, RestrictionT>>, OutputT> fn, + List<PCollectionView<?>> sideInputs, + ReadyCheckingSideInputReader sideInputReader, + OutputManager outputManager, + TupleTag<OutputT> mainOutputTag, + List<TupleTag<?>> additionalOutputTags, + DirectExecutionContext.DirectStepContext stepContext, + AggregatorContainer.Mutator aggregatorChanges, + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy) { + ProcessFn<InputT, OutputT, RestrictionT, ?> processFn = + (ProcessFn) fn; + return DoFnRunners.newProcessFnRunner( + processFn, + options, + sideInputs, + sideInputReader, + outputManager, + mainOutputTag, + additionalOutputTags, + stepContext, + aggregatorChanges, + windowingStrategy); + } + }; + } } http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java index be77ea1..8793ae8 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/StatefulParDoEvaluatorFactory.java @@ -65,7 +65,9 @@ final class StatefulParDoEvaluatorFactory<K, InputT, OutputT> implements Transfo private final ParDoEvaluatorFactory<KV<K, InputT>, OutputT> delegateFactory; StatefulParDoEvaluatorFactory(EvaluationContext evaluationContext) { - this.delegateFactory = new ParDoEvaluatorFactory<>(evaluationContext); + this.delegateFactory = + new ParDoEvaluatorFactory<>( + evaluationContext, ParDoEvaluator.<KV<K, InputT>, OutputT>defaultRunnerFactory()); this.cleanupRegistry = CacheBuilder.newBuilder() .weakValues() http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java index ae7ad93..d06c460 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/TransformEvaluatorRegistry.java @@ -52,7 +52,9 @@ class TransformEvaluatorRegistry implements TransformEvaluatorFactory { ImmutableMap.<Class<? extends PTransform>, TransformEvaluatorFactory>builder() .put(Read.Bounded.class, new BoundedReadEvaluatorFactory(ctxt)) .put(Read.Unbounded.class, new UnboundedReadEvaluatorFactory(ctxt)) - .put(ParDo.MultiOutput.class, new ParDoEvaluatorFactory<>(ctxt)) + .put( + ParDo.MultiOutput.class, + new ParDoEvaluatorFactory<>(ctxt, ParDoEvaluator.defaultRunnerFactory())) .put(StatefulParDo.class, new StatefulParDoEvaluatorFactory<>(ctxt)) .put(PCollections.class, new FlattenEvaluatorFactory(ctxt)) .put(WriteView.class, new ViewEvaluatorFactory(ctxt)) http://git-wip-us.apache.org/repos/asf/beam/blob/b93de58f/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java index 2be0f9d..e99e4bf 100644 --- a/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java +++ b/runners/direct-java/src/test/java/org/apache/beam/runners/direct/ParDoEvaluatorTest.java @@ -169,7 +169,8 @@ public class ParDoEvaluatorTest { ImmutableList.<PCollectionView<?>>of(singletonView), mainOutputTag, additionalOutputTags, - ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output)); + ImmutableMap.<TupleTag<?>, PCollection<?>>of(mainOutputTag, output), + ParDoEvaluator.<Integer, Integer>defaultRunnerFactory()); } private static class RecorderFn extends DoFn<Integer, Integer> {