Implement GroupAlsoByWindowsEvaluator Directly

Invoke the ReduceFnRunner methods directly rather than by constructing a
GroupAlsoByWindowDoFn and a ParDoEvaluator. Instead, construct a
ReduceFnRunner, ReduceFn, and invoke the methods directly.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/6b36feab
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/6b36feab
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/6b36feab

Branch: refs/heads/apex-runner
Commit: 6b36feabd081ee7e174354e37b881abe8b38574e
Parents: 2a1fdee
Author: Thomas Groh <tg...@google.com>
Authored: Tue Nov 1 14:29:48 2016 -0700
Committer: Thomas Groh <tg...@google.com>
Committed: Thu Nov 3 15:15:58 2016 -0700

----------------------------------------------------------------------
 .../runners/direct/AggregatorContainer.java     |  20 +-
 .../GroupAlsoByWindowEvaluatorFactory.java      | 248 +++++++++++++++----
 2 files changed, 218 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b36feab/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
index 03e2509..7b6bc64 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/AggregatorContainer.java
@@ -151,8 +151,24 @@ public class AggregatorContainer {
 
     @Override
     public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createAggregatorForDoFn(
-        Class<?> fnClass, ExecutionContext.StepContext step,
-        String name, CombineFn<InputT, AccumT, OutputT> combine) {
+        Class<?> fnClass,
+        ExecutionContext.StepContext step,
+        String name,
+        CombineFn<InputT, AccumT, OutputT> combine) {
+      return createAggregatorForStep(step, name, combine);
+    }
+
+    public <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createSystemAggregator(
+        ExecutionContext.StepContext step,
+        String name,
+        CombineFn<InputT, AccumT, OutputT> combiner) {
+      return createAggregatorForStep(step, name, combiner);
+    }
+
+    private <InputT, AccumT, OutputT> Aggregator<InputT, OutputT> 
createAggregatorForStep(
+        ExecutionContext.StepContext step,
+        String name,
+        CombineFn<InputT, AccumT, OutputT> combine) {
       checkState(!committed, "Cannot create aggregators after committing");
       AggregatorKey key = AggregatorKey.create(step.getStepName(), name);
       AggregatorInfo<?, ?, ?> aggregatorInfo = accumulatorDeltas.get(key);

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/6b36feab/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
----------------------------------------------------------------------
diff --git 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
index 37cc319..e5c5e4b 100644
--- 
a/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
+++ 
b/runners/direct-java/src/main/java/org/apache/beam/runners/direct/GroupAlsoByWindowEvaluatorFactory.java
@@ -17,30 +17,47 @@
  */
 package org.apache.beam.runners.direct;
 
-import com.google.common.collect.ImmutableMap;
-import java.util.Collections;
-import org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn;
+import com.google.common.base.Function;
+import com.google.common.base.Predicate;
+import com.google.common.collect.FluentIterable;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly;
 import 
org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupAlsoByWindow;
 import org.apache.beam.runners.core.GroupByKeyViaGroupByKeyOnly.GroupByKeyOnly;
+import org.apache.beam.runners.core.ReduceFnRunner;
 import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
 import org.apache.beam.runners.direct.DirectExecutionContext.DirectStepContext;
 import org.apache.beam.runners.direct.DirectGroupByKey.DirectGroupAlsoByWindow;
 import org.apache.beam.runners.direct.DirectRunner.CommittedBundle;
+import org.apache.beam.runners.direct.DirectRunner.UncommittedBundle;
 import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
 import org.apache.beam.sdk.transforms.AppliedPTransform;
-import org.apache.beam.sdk.transforms.OldDoFn;
 import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.Sum;
 import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
 import org.apache.beam.sdk.util.KeyedWorkItem;
+import org.apache.beam.sdk.util.TimerInternals;
+import org.apache.beam.sdk.util.TimerInternals.TimerData;
+import org.apache.beam.sdk.util.WindowTracing;
 import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingInternals;
 import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.util.state.CopyOnAccessInMemoryStateInternals;
 import org.apache.beam.sdk.util.state.StateInternals;
-import org.apache.beam.sdk.util.state.StateInternalsFactory;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.PCollectionView;
 import org.apache.beam.sdk.values.TupleTag;
+import org.joda.time.Instant;
 
 /**
  * The {@link DirectRunner} {@link TransformEvaluatorFactory} for the
@@ -86,9 +103,23 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
    */
   private static class GroupAlsoByWindowEvaluator<K, V>
       implements TransformEvaluator<KeyedWorkItem<K, V>> {
-    private static final TupleTag<Object> MAIN_OUTPUT_TAG = new 
TupleTag<Object>() {};
+    private final EvaluationContext evaluationContext;
+    private final AppliedPTransform<
+        PCollection<KeyedWorkItem<K, V>>, PCollection<KV<K, Iterable<V>>>,
+        DirectGroupAlsoByWindow<K, V>>
+        application;
 
-    private final TransformEvaluator<KeyedWorkItem<K, V>> gabwParDoEvaluator;
+    private final DirectStepContext stepContext;
+    private @SuppressWarnings("unchecked") final WindowingStrategy<?, 
BoundedWindow>
+        windowingStrategy;
+
+    private final Collection<UncommittedBundle<?>> outputBundles;
+    private final ImmutableList.Builder<WindowedValue<KeyedWorkItem<K, V>>> 
unprocessedElements;
+    private final AggregatorContainer.Mutator aggregatorChanges;
+
+    private final SystemReduceFn<K, V, Iterable<V>, Iterable<V>, 
BoundedWindow> reduceFn;
+    private final Aggregator<Long, Long> droppedDueToClosedWindow;
+    private final Aggregator<Long, Long> droppedDueToLateness;
 
     public GroupAlsoByWindowEvaluator(
         final EvaluationContext evaluationContext,
@@ -97,67 +128,188 @@ class GroupAlsoByWindowEvaluatorFactory implements 
TransformEvaluatorFactory {
                 PCollection<KeyedWorkItem<K, V>>,
                 PCollection<KV<K, Iterable<V>>>,
                 DirectGroupAlsoByWindow<K, V>> application) {
+      this.evaluationContext = evaluationContext;
+      this.application = application;
 
-      Coder<V> valueCoder =
-          
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
-
-      @SuppressWarnings("unchecked")
-      WindowingStrategy<?, BoundedWindow> windowingStrategy =
+      stepContext = evaluationContext
+          .getExecutionContext(application, inputBundle.getKey())
+          .getOrCreateStepContext(
+              evaluationContext.getStepName(application), 
application.getTransform().getName());
+      windowingStrategy =
           (WindowingStrategy<?, BoundedWindow>)
               application.getTransform().getInputWindowingStrategy();
 
-      DirectStepContext stepContext =
-          evaluationContext
-              .getExecutionContext(application, inputBundle.getKey())
-              .getOrCreateStepContext(
-                  evaluationContext.getStepName(application), 
application.getTransform().getName());
+      outputBundles = new ArrayList<>();
+      unprocessedElements = ImmutableList.builder();
+      aggregatorChanges = evaluationContext.getAggregatorMutator();
 
-      StateInternals<K> stateInternals = (StateInternals<K>) 
stepContext.stateInternals();
-
-      OldDoFn<KeyedWorkItem<K, V>, KV<K, Iterable<V>>> gabwDoFn =
-          GroupAlsoByWindowViaWindowSetDoFn.create(
-              windowingStrategy,
-              new ConstantStateInternalsFactory<K>(stateInternals),
-              SystemReduceFn.<K, V, BoundedWindow>buffering(valueCoder));
-
-      // Not technically legit, as the application is not a ParDo
-      this.gabwParDoEvaluator =
-          ParDoEvaluator.create(
-              evaluationContext,
-              stepContext,
-              application,
-              windowingStrategy,
-              gabwDoFn,
-              Collections.<PCollectionView<?>>emptyList(),
-              MAIN_OUTPUT_TAG,
-              Collections.<TupleTag<?>>emptyList(),
-              ImmutableMap.<TupleTag<?>, PCollection<?>>of(
-                  MAIN_OUTPUT_TAG, application.getOutput()));
+      Coder<V> valueCoder =
+          
application.getTransform().getValueCoder(inputBundle.getPCollection().getCoder());
+      reduceFn = SystemReduceFn.buffering(valueCoder);
+      droppedDueToClosedWindow = 
aggregatorChanges.createSystemAggregator(stepContext,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER,
+          new Sum.SumLongFn());
+      droppedDueToLateness = 
aggregatorChanges.createSystemAggregator(stepContext,
+          GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER,
+          new Sum.SumLongFn());
     }
 
     @Override
     public void processElement(WindowedValue<KeyedWorkItem<K, V>> element) 
throws Exception {
-      gabwParDoEvaluator.processElement(element);
+      KeyedWorkItem<K, V> workItem = element.getValue();
+      K key = workItem.key();
+
+      UncommittedBundle<KV<K, Iterable<V>>> bundle =
+          evaluationContext.createBundle(application.getOutput());
+      outputBundles.add(bundle);
+      CopyOnAccessInMemoryStateInternals<K> stateInternals =
+          (CopyOnAccessInMemoryStateInternals<K>) stepContext.stateInternals();
+      DirectTimerInternals timerInternals = stepContext.timerInternals();
+      ReduceFnRunner<K, V, Iterable<V>, BoundedWindow> reduceFnRunner =
+          new ReduceFnRunner<>(
+              key,
+              windowingStrategy,
+              ExecutableTriggerStateMachine.create(
+                  
TriggerStateMachines.stateMachineForTrigger(windowingStrategy.getTrigger())),
+              stateInternals,
+              timerInternals,
+              new DirectWindowingInternals<>(bundle),
+              droppedDueToClosedWindow,
+              reduceFn,
+              evaluationContext.getPipelineOptions());
+
+      // Drop any elements within expired windows
+      reduceFnRunner.processElements(
+          dropExpiredWindows(key, workItem.elementsIterable(), 
timerInternals));
+      for (TimerData timer : workItem.timersIterable()) {
+        reduceFnRunner.onTimer(timer);
+      }
+      reduceFnRunner.persist();
     }
 
     @Override
     public TransformResult finishBundle() throws Exception {
-      return gabwParDoEvaluator.finishBundle();
+      // State is initialized within the constructor. It can never be null.
+      CopyOnAccessInMemoryStateInternals<?> state = stepContext.commitState();
+      return StepTransformResult.withHold(application, 
state.getEarliestWatermarkHold())
+          .withState(state)
+          .addOutput(outputBundles)
+          .withTimerUpdate(stepContext.getTimerUpdate())
+          .withAggregatorChanges(aggregatorChanges)
+          .addUnprocessedElements(unprocessedElements.build())
+          .build();
+    }
+
+    /**
+     * Returns an {@code Iterable<WindowedValue<InputT>>} that only contains 
non-late input
+     * elements.
+     */
+    public Iterable<WindowedValue<V>> dropExpiredWindows(
+        final K key, Iterable<WindowedValue<V>> elements, final TimerInternals 
timerInternals) {
+      return FluentIterable.from(elements)
+          .transformAndConcat(
+              // Explode windows to filter out expired ones
+              new Function<WindowedValue<V>, Iterable<WindowedValue<V>>>() {
+                @Override
+                public Iterable<WindowedValue<V>> apply(WindowedValue<V> 
input) {
+                  return input.explodeWindows();
+                }
+              })
+          .filter(
+              new Predicate<WindowedValue<V>>() {
+                @Override
+                public boolean apply(WindowedValue<V> input) {
+                  BoundedWindow window = 
Iterables.getOnlyElement(input.getWindows());
+                  boolean expired =
+                      window
+                          .maxTimestamp()
+                          .plus(windowingStrategy.getAllowedLateness())
+                          
.isBefore(timerInternals.currentInputWatermarkTime());
+                  if (expired) {
+                    // The element is too late for this window.
+                    droppedDueToLateness.addValue(1L);
+                    WindowTracing.debug(
+                        "GroupAlsoByWindow: Dropping element at {} for key: 
{}; "
+                            + "window: {} since it is too far behind 
inputWatermark: {}",
+                        input.getTimestamp(),
+                        key,
+                        window,
+                        timerInternals.currentInputWatermarkTime());
+                  }
+                  // Keep the element if the window is not expired.
+                  return !expired;
+                }
+              });
     }
   }
 
-  private static final class ConstantStateInternalsFactory<K>
-      implements StateInternalsFactory<K> {
-    private final StateInternals<K> stateInternals;
+  private static class DirectWindowingInternals<K, V>
+      implements WindowingInternals<Object, KV<K, Iterable<V>>> {
+    private final UncommittedBundle<KV<K, Iterable<V>>> bundle;
+
+    private DirectWindowingInternals(
+        UncommittedBundle<KV<K, Iterable<V>>> bundle) {
+      this.bundle = bundle;
+    }
 
-    private ConstantStateInternalsFactory(StateInternals<K> stateInternals) {
-      this.stateInternals = stateInternals;
+    @Override
+    public StateInternals<?> stateInternals() {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should use the %s it is provided rather than the contents of 
%s",
+              ReduceFnRunner.class.getSimpleName(),
+              StateInternals.class.getSimpleName(),
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      bundle.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    @Override
+    public TimerInternals timerInternals() {
+      throw new UnsupportedOperationException(
+          String.format(
+              "%s should use the %s it is provided rather than the contents of 
%s",
+              ReduceFnRunner.class.getSimpleName(),
+              TimerInternals.class.getSimpleName(),
+              getClass().getSimpleName()));
+    }
+
+    @Override
+    public Collection<? extends BoundedWindow> windows() {
+      throw new IllegalArgumentException(
+          String.format(
+              "%s should not access Windows via %s.windows(); "
+                  + "it should instead inspect the window of the input 
elements",
+              GroupAlsoByWindowEvaluator.class.getSimpleName(),
+              WindowingInternals.class.getSimpleName()));
+    }
+
+    @Override
+    public PaneInfo pane() {
+      throw new IllegalArgumentException(
+          String.format(
+              "%s should not access Windows via %s.windows(); "
+                  + "it should instead inspect the window of the input 
elements",
+              GroupAlsoByWindowEvaluator.class.getSimpleName(),
+              WindowingInternals.class.getSimpleName()));
+    }
+
+    @Override
+    public <T> void writePCollectionViewData(
+        TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) 
throws IOException {
+      throw new UnsupportedOperationException();
     }
 
     @Override
-    @SuppressWarnings("unchecked")
-    public StateInternals<K> stateInternalsForKey(K key) {
-      return stateInternals;
+    public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+      throw new UnsupportedOperationException();
     }
   }
 }

Reply via email to