Implement GroupAlsoByWindowsEvaluator Directly Invoke the ReduceFnRunner methods directly rather than by constructing a GroupAlsoByWindowDoFn and a ParDoEvaluator. Instead, construct a ReduceFnRunner, ReduceFn, and invoke the methods directly.
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b36feab Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b36feab Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b36feab Branch: refs/heads/apex-runner Commit: 6b36feabd081ee7e174354e37b881abe8b38574e Parents: 2a1fdee Author: Thomas Groh <tg...@google.com> Authored: Tue Nov 1 14:29:48 2016 -0700 Committer: Thomas Groh <tg...@google.com> Committed: Thu Nov 3 15:15:58 2016 -0700 ---------------------------------------------------------------------- .../runners/direct/AggregatorContainer.java | 20 +- .../GroupAlsoByWindowEvaluatorFactory.java | 248 +++++++++++++++---- 2 files changed, 218 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b36feab/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java index 03e2509..7b6bc64 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java @@ -151,8 +151,24 @@ public class AggregatorContainer { @Override public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForDoFn( - Class<?> fnClass, ExecutionContext.StepContext step, - String name, CombineFn<InputT, AccumT, OutputT> combine) { + Class<?> fnClass, + ExecutionContext.StepContext step, + String name, + CombineFn<InputT, AccumT, OutputT> combine) { + return createAggregatorForStep(step, name, combine); + } + + public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createSystemAggregator( + ExecutionContext.StepContext step, + String name, + CombineFn<InputT, AccumT, OutputT> combiner) { + return createAggregatorForStep(step, name, combiner); + } + + private <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> createAggregatorForStep( + ExecutionContext.StepContext step, + String name, + CombineFn<InputT, AccumT, OutputT> combine) { checkState(!committed, "Cannot create aggregators after committing"); AggregatorKey key = AggregatorKey.create(step.getStepName(), name); AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key); http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b36feab/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java ---------------------------------------------------------------------- diff --git a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java index 37cc319..e5c5e4b 100644 --- a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java +++ b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java @@ -17,30 +17,47 @@ */ package org.apache.beam.runners.direct; -import com.google.common.collect.ImmutableMap; -import java.util.Collections; -import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn; +import com.google.common.base.Function; +import com.google.common.base.Predicate; +import com.google.common.collect.FluentIterable; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Iterables; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow; import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly; +import org.apache.beam.runners.core.ReduceFnRunner; import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext; import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow; import org.apache.beam.runners.direct.DirectRunner.CommittedBundle; +import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle; import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Aggregator; import org.apache.beam.sdk.transforms.AppliedPTransform; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.Sum; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.KeyedWorkItem; +import org.apache.beam.sdk.util.TimerInternals; +import org.apache.beam.sdk.util.TimerInternals.TimerData; +import org.apache.beam.sdk.util.WindowTracing; import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingInternals; import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals; import org.apache.beam.sdk.util.state.StateInternals; -import org.apache.beam.sdk.util.state.StateInternalsFactory; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollection; import org.apache.beam.sdk.values.PCollectionView; import org.apache.beam.sdk.values.TupleTag; +import org.joda.time.Instant; /** * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the @@ -86,9 +103,23 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { */ private static class GroupAlsoByWindowEvaluator<K, V> implements TransformEvaluator<KeyedWorkItem<K, V>> { - private static final TupleTag<Object> MAIN_OUTPUT_TAG = new TupleTag<Object>() {}; + private final EvaluationContext evaluationContext; + private final AppliedPTransform< + PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, + DirectGroupAlsoByWindow<K, V>> + application; - private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator; + private final DirectStepContext stepContext; + private @SuppressWarnings("unchecked") final WindowingStrategy<?, BoundedWindow> + windowingStrategy; + + private final Collection<UncommittedBundle<?>> outputBundles; + private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> unprocessedElements; + private final AggregatorContainer.Mutator aggregatorChanges; + + private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, BoundedWindow> reduceFn; + private final Aggregator<Long, Long> droppedDueToClosedWindow; + private final Aggregator<Long, Long> droppedDueToLateness; public GroupAlsoByWindowEvaluator( final EvaluationContext evaluationContext, @@ -97,67 +128,188 @@ class GroupAlsoByWindowEvaluatorFactory implements TransformEvaluatorFactory { PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>, DirectGroupAlsoByWindow<K, V>> application) { + this.evaluationContext = evaluationContext; + this.application = application; - Coder<V> valueCoder = - application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); - - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> windowingStrategy = + stepContext = evaluationContext + .getExecutionContext(application, inputBundle.getKey()) + .getOrCreateStepContext( + evaluationContext.getStepName(application), application.getTransform().getName()); + windowingStrategy = (WindowingStrategy<?, BoundedWindow>) application.getTransform().getInputWindowingStrategy(); - DirectStepContext stepContext = - evaluationContext - .getExecutionContext(application, inputBundle.getKey()) - .getOrCreateStepContext( - evaluationContext.getStepName(application), application.getTransform().getName()); + outputBundles = new ArrayList<>(); + unprocessedElements = ImmutableList.builder(); + aggregatorChanges = evaluationContext.getAggregatorMutator(); - StateInternals<K> stateInternals = (StateInternals<K>) stepContext.stateInternals(); - - OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn = - GroupAlsoByWindowViaWindowSetDoFn.create( - windowingStrategy, - new ConstantStateInternalsFactory<K>(stateInternals), - SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder)); - - // Not technically legit, as the application is not a ParDo - this.gabwParDoEvaluator = - ParDoEvaluator.create( - evaluationContext, - stepContext, - application, - windowingStrategy, - gabwDoFn, - Collections.<PCollectionView<?>>emptyList(), - MAIN_OUTPUT_TAG, - Collections.<TupleTag<?>>emptyList(), - ImmutableMap.<TupleTag<?>, PCollection<?>>of( - MAIN_OUTPUT_TAG, application.getOutput())); + Coder<V> valueCoder = + application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder()); + reduceFn = SystemReduceFn.buffering(valueCoder); + droppedDueToClosedWindow = aggregatorChanges.createSystemAggregator(stepContext, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER, + new Sum.SumLongFn()); + droppedDueToLateness = aggregatorChanges.createSystemAggregator(stepContext, + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER, + new Sum.SumLongFn()); } @Override public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) throws Exception { - gabwParDoEvaluator.processElement(element); + KeyedWorkItem<K, V> workItem = element.getValue(); + K key = workItem.key(); + + UncommittedBundle<KV<K, Iterable<V>>> bundle = + evaluationContext.createBundle(application.getOutput()); + outputBundles.add(bundle); + CopyOnAccessInMemoryStateInternals<K> stateInternals = + (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals(); + DirectTimerInternals timerInternals = stepContext.timerInternals(); + ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner = + new ReduceFnRunner<>( + key, + windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())), + stateInternals, + timerInternals, + new DirectWindowingInternals<>(bundle), + droppedDueToClosedWindow, + reduceFn, + evaluationContext.getPipelineOptions()); + + // Drop any elements within expired windows + reduceFnRunner.processElements( + dropExpiredWindows(key, workItem.elementsIterable(), timerInternals)); + for (TimerData timer : workItem.timersIterable()) { + reduceFnRunner.onTimer(timer); + } + reduceFnRunner.persist(); } @Override public TransformResult finishBundle() throws Exception { - return gabwParDoEvaluator.finishBundle(); + // State is initialized within the constructor. It can never be null. + CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState(); + return StepTransformResult.withHold(application, state.getEarliestWatermarkHold()) + .withState(state) + .addOutput(outputBundles) + .withTimerUpdate(stepContext.getTimerUpdate()) + .withAggregatorChanges(aggregatorChanges) + .addUnprocessedElements(unprocessedElements.build()) + .build(); + } + + /** + * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains non-late input + * elements. + */ + public Iterable<WindowedValue<V>> dropExpiredWindows( + final K key, Iterable<WindowedValue<V>> elements, final TimerInternals timerInternals) { + return FluentIterable.from(elements) + .transformAndConcat( + // Explode windows to filter out expired ones + new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() { + @Override + public Iterable<WindowedValue<V>> apply(WindowedValue<V> input) { + return input.explodeWindows(); + } + }) + .filter( + new Predicate<WindowedValue<V>>() { + @Override + public boolean apply(WindowedValue<V> input) { + BoundedWindow window = Iterables.getOnlyElement(input.getWindows()); + boolean expired = + window + .maxTimestamp() + .plus(windowingStrategy.getAllowedLateness()) + .isBefore(timerInternals.currentInputWatermarkTime()); + if (expired) { + // The element is too late for this window. + droppedDueToLateness.addValue(1L); + WindowTracing.debug( + "GroupAlsoByWindow: Dropping element at {} for key: {}; " + + "window: {} since it is too far behind inputWatermark: {}", + input.getTimestamp(), + key, + window, + timerInternals.currentInputWatermarkTime()); + } + // Keep the element if the window is not expired. + return !expired; + } + }); } } - private static final class ConstantStateInternalsFactory<K> - implements StateInternalsFactory<K> { - private final StateInternals<K> stateInternals; + private static class DirectWindowingInternals<K, V> + implements WindowingInternals<Object, KV<K, Iterable<V>>> { + private final UncommittedBundle<KV<K, Iterable<V>>> bundle; + + private DirectWindowingInternals( + UncommittedBundle<KV<K, Iterable<V>>> bundle) { + this.bundle = bundle; + } - private ConstantStateInternalsFactory(StateInternals<K> stateInternals) { - this.stateInternals = stateInternals; + @Override + public StateInternals<?> stateInternals() { + throw new UnsupportedOperationException( + String.format( + "%s should use the %s it is provided rather than the contents of %s", + ReduceFnRunner.class.getSimpleName(), + StateInternals.class.getSimpleName(), + getClass().getSimpleName())); + } + + @Override + public void outputWindowedValue( + KV<K, Iterable<V>> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + bundle.add(WindowedValue.of(output, timestamp, windows, pane)); + } + + @Override + public TimerInternals timerInternals() { + throw new UnsupportedOperationException( + String.format( + "%s should use the %s it is provided rather than the contents of %s", + ReduceFnRunner.class.getSimpleName(), + TimerInternals.class.getSimpleName(), + getClass().getSimpleName())); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + throw new IllegalArgumentException( + String.format( + "%s should not access Windows via %s.windows(); " + + "it should instead inspect the window of the input elements", + GroupAlsoByWindowEvaluator.class.getSimpleName(), + WindowingInternals.class.getSimpleName())); + } + + @Override + public PaneInfo pane() { + throw new IllegalArgumentException( + String.format( + "%s should not access Windows via %s.windows(); " + + "it should instead inspect the window of the input elements", + GroupAlsoByWindowEvaluator.class.getSimpleName(), + WindowingInternals.class.getSimpleName())); + } + + @Override + public <T> void writePCollectionViewData( + TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + throw new UnsupportedOperationException(); } @Override - @SuppressWarnings("unchecked") - public StateInternals<K> stateInternalsForKey(K key) { - return stateInternals; + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new UnsupportedOperationException(); } } }