Repository: beam
Updated Branches:
  refs/heads/master a41afdc68 -> 61e31e622


Implementation of GroupAlsoByWindowViaWindowSet for the Spark runner.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32a9d61e
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32a9d61e
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32a9d61e

Branch: refs/heads/master
Commit: 32a9d61eff79d8456f12a57c711e2b288f9c775b
Parents: bf0c119
Author: Sela <ans...@paypal.com>
Authored: Mon Feb 13 16:32:21 2017 +0200
Committer: Sela <ans...@paypal.com>
Committed: Wed Mar 1 00:17:58 2017 +0200

----------------------------------------------------------------------
 .../SparkGroupAlsoByWindowViaWindowSet.java     | 393 +++++++++++++++++++
 1 file changed, 393 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/32a9d61e/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
----------------------------------------------------------------------
diff --git 
a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
new file mode 100644
index 0000000..7902d7c
--- /dev/null
+++ 
b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.runners.spark.stateful;
+
+import com.google.common.collect.Table;
+import com.google.common.collect.AbstractIterator;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn;
+import org.apache.beam.runners.core.OutputWindowedValue;
+import org.apache.beam.runners.core.ReduceFnRunner;
+import org.apache.beam.runners.core.SystemReduceFn;
+import org.apache.beam.runners.core.TimerInternals.TimerData;
+import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine;
+import org.apache.beam.runners.core.triggers.TriggerStateMachines;
+import org.apache.beam.runners.spark.SparkPipelineOptions;
+import org.apache.beam.runners.spark.translation.SparkRuntimeContext;
+import org.apache.beam.runners.spark.translation.TranslationUtils;
+import org.apache.beam.runners.spark.translation.WindowingHelpers;
+import org.apache.beam.runners.spark.util.GlobalWatermarkHolder;
+import org.apache.beam.runners.spark.util.LateDataUtils;
+import org.apache.beam.runners.spark.util.UnsupportedSideInputReader;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.transforms.Aggregator;
+import org.apache.beam.sdk.transforms.Combine;
+import org.apache.beam.sdk.transforms.Sum;
+import org.apache.beam.sdk.transforms.windowing.BoundedWindow;
+import org.apache.beam.sdk.transforms.windowing.PaneInfo;
+import org.apache.beam.sdk.util.WindowedValue;
+import org.apache.beam.sdk.util.WindowingStrategy;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.TupleTag;
+import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaSparkContext$;
+import org.apache.spark.api.java.function.FlatMapFunction;
+import org.apache.spark.api.java.function.Function;
+import org.apache.spark.streaming.Duration;
+import org.apache.spark.streaming.api.java.JavaDStream;
+import org.apache.spark.streaming.api.java.JavaPairDStream;
+import org.apache.spark.streaming.dstream.DStream;
+import org.apache.spark.streaming.dstream.PairDStreamFunctions;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import scala.Function1;
+import scala.Option;
+import scala.Tuple2;
+import scala.Tuple3;
+import scala.collection.Seq;
+import scala.reflect.ClassTag;
+import scala.runtime.AbstractFunction1;
+
+/**
+ * An implementation of {@link 
org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn}
+ * logic for grouping by windows and controlling trigger firings and pane 
accumulation.
+ *
+ * <p>This implementation is a composite of Spark transformations revolving 
around state management
+ * using Spark's
+ * {@link PairDStreamFunctions#updateStateByKey(Function1, Partitioner, 
boolean, ClassTag)}
+ * to update state with new data and timers.
+ *
+ * <p>Using updateStateByKey allows to scan through the entire state visiting 
not just the
+ * updated state (new values for key) but also check if timers are ready to 
fire.
+ * Since updateStateByKey bounds the types of state and output to be the same,
+ * a (state, output) tuple is used, filtering the state (and output if no 
firing)
+ * in the following steps.
+ */
+public class SparkGroupAlsoByWindowViaWindowSet {
+  private static final Logger LOG = 
LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class);
+
+  /**
+   * A helper class that is essentially a {@link Serializable} {@link 
AbstractFunction1}.
+   */
+  private abstract static class SerializableFunction1<T1, T2>
+      extends AbstractFunction1<T1, T2> implements Serializable {
+  }
+
+  public static <K, InputT, W extends BoundedWindow>
+      JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow(
+          JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> 
inputDStream,
+          final Coder<InputT> iCoder,
+          final WindowingStrategy<?, W> windowingStrategy,
+          final SparkRuntimeContext runtimeContext,
+          final List<Integer> sourceIds) {
+
+    Long checkpointDuration =
+        runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class)
+            .getCheckpointDurationMillis();
+
+    // we have to switch to Scala API to avoid Optional in the Java API, see: 
SPARK-4819.
+    // we also have a broader API for Scala (access to the actual key and 
entire iterator).
+    DStream<Tuple2<K, Iterable<WindowedValue<InputT>>>> pairDStream =
+        inputDStream
+            .map(WindowingHelpers.<KV<K, 
Iterable<WindowedValue<InputT>>>>unwindowFunction())
+            .mapToPair(TranslationUtils.<K, 
Iterable<WindowedValue<InputT>>>toPairFunction())
+            .dstream();
+    PairDStreamFunctions<K, Iterable<WindowedValue<InputT>>> 
pairDStreamFunctions =
+        DStream.toPairDStreamFunctions(
+        pairDStream,
+        JavaSparkContext$.MODULE$.<K>fakeClassTag(),
+        
JavaSparkContext$.MODULE$.<Iterable<WindowedValue<InputT>>>fakeClassTag(),
+        null);
+    int defaultNumPartitions = 
pairDStreamFunctions.defaultPartitioner$default$1();
+    Partitioner partitioner = 
pairDStreamFunctions.defaultPartitioner(defaultNumPartitions);
+
+    // use updateStateByKey to scan through the state and update elements and 
timers.
+    DStream<Tuple2<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>>
+        firedStream = pairDStreamFunctions.updateStateByKey(
+            new SerializableFunction1<
+                scala.collection.Iterator<Tuple3<K, 
Seq<Iterable<WindowedValue<InputT>>>,
+                    Option<Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>>>,
+                scala.collection.Iterator<Tuple2<K, Tuple2<StateAndTimers,
+                    List<WindowedValue<KV<K, Iterable<InputT>>>>>>>>() {
+
+      @Override
+      public scala.collection.Iterator<Tuple2<K, Tuple2<StateAndTimers,
+          List<WindowedValue<KV<K, Iterable<InputT>>>>>>> apply(
+              final scala.collection.Iterator<Tuple3<K, 
Seq<Iterable<WindowedValue<InputT>>>,
+              Option<Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>>> iter) {
+        //--- ACTUAL STATEFUL OPERATION:
+        //
+        // Input Iterator: the partition (~bundle) of a cogrouping of the input
+        // and the previous state (if exists).
+        //
+        // Output Iterator: the output key, and the updated state.
+        //
+        // possible input scenarios for (K, Seq, Option<S>):
+        // (1) Option<S>.isEmpty: new data with no previous state.
+        // (2) Seq.isEmpty: no new data, but evaluating previous state 
(timer-like behaviour).
+        // (3) Seq.nonEmpty && Option<S>.isDefined: new data with previous 
state.
+
+        final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> 
reduceFn =
+            SystemReduceFn.buffering(iCoder);
+        final OutputWindowedValueHolder<K, InputT> outputHolder =
+            new OutputWindowedValueHolder<>();
+        // use in memory Aggregators since Spark Accumulators are not resilient
+        // in stateful operators, once done with this partition.
+        final InMemoryLongSumAggregator droppedDueToClosedWindow = new 
InMemoryLongSumAggregator(
+            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER);
+        final InMemoryLongSumAggregator droppedDueToLateness = new 
InMemoryLongSumAggregator(
+            GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER);
+
+        AbstractIterator<
+            Tuple2<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>>
+                outIter = new AbstractIterator<Tuple2<K,
+                    Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>>() {
+                  @Override
+                  protected Tuple2<K, Tuple2<StateAndTimers,
+                      List<WindowedValue<KV<K, Iterable<InputT>>>>>> 
computeNext() {
+                    // input iterator is a Spark partition (~bundle), 
containing keys and their
+                    // (possibly) previous-state and (possibly) new data.
+                    while (iter.hasNext()) {
+                      // for each element in the partition:
+                      Tuple3<K, Seq<Iterable<WindowedValue<InputT>>>, 
Option<Tuple2<StateAndTimers,
+                          List<WindowedValue<KV<K, Iterable<InputT>>>>>>> next 
= iter.next();
+                      K key = next._1();
+
+                      Seq<Iterable<WindowedValue<InputT>>> seq = next._2();
+
+                      Option<Tuple2<StateAndTimers,
+                          List<WindowedValue<KV<K, Iterable<InputT>>>>>>
+                              prevStateAndTimersOpt = next._3();
+
+                      SparkStateInternals<K> stateInternals;
+                      SparkTimerInternals timerInternals = 
SparkTimerInternals.forStreamFromSources(
+                          sourceIds, GlobalWatermarkHolder.get());
+                      // get state(internals) per key.
+                      if (prevStateAndTimersOpt.isEmpty()) {
+                        // no previous state.
+                        stateInternals = SparkStateInternals.forKey(key);
+                      } else {
+                        // with pre-existing state.
+                        StateAndTimers prevStateAndTimers = 
prevStateAndTimersOpt.get()._1();
+                        stateInternals = 
SparkStateInternals.forKeyAndState(key,
+                            prevStateAndTimers.getState());
+                        
timerInternals.addTimers(prevStateAndTimers.getTimers());
+                      }
+
+                      ReduceFnRunner<K, InputT, Iterable<InputT>, W> 
reduceFnRunner =
+                          new ReduceFnRunner<>(
+                              key,
+                              windowingStrategy,
+                              ExecutableTriggerStateMachine.create(
+                                  TriggerStateMachines.stateMachineForTrigger(
+                                      windowingStrategy.getTrigger())),
+                              stateInternals,
+                              timerInternals,
+                              outputHolder,
+                              new 
UnsupportedSideInputReader("GroupAlsoByWindow"),
+                              droppedDueToClosedWindow,
+                              reduceFn,
+                              runtimeContext.getPipelineOptions());
+
+                      outputHolder.clear(); // clear before potential use.
+                      if (!seq.isEmpty()) {
+                        // new input for key.
+                        try {
+                          Iterable<WindowedValue<InputT>> elementsIterable = 
seq.head();
+                          Iterable<WindowedValue<InputT>> validElements =
+                              LateDataUtils
+                                  .dropExpiredWindows(
+                                      key,
+                                      elementsIterable,
+                                      timerInternals,
+                                      windowingStrategy,
+                                      droppedDueToLateness);
+                          reduceFnRunner.processElements(validElements);
+                        } catch (Exception e) {
+                          throw new RuntimeException(
+                              "Failed to process element with ReduceFnRunner", 
e);
+                        }
+                      } else if (stateInternals.getState().isEmpty()) {
+                        // no input and no state -> GC evict now.
+                        continue;
+                      }
+                      try {
+                        // advance the watermark to HWM to fire by timers.
+                        timerInternals.advanceWatermark();
+                        // call on timers that are ready.
+                        
reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess());
+                      } catch (Exception e) {
+                        throw new RuntimeException(
+                            "Failed to process ReduceFnRunner onTimer.", e);
+                      }
+                      // this is mostly symbolic since actual persist is done 
by emitting output.
+                      reduceFnRunner.persist();
+                      // obtain output, if fired.
+                      List<WindowedValue<KV<K, Iterable<InputT>>>> outputs = 
outputHolder.get();
+                      if (!outputs.isEmpty() || 
!stateInternals.getState().isEmpty()) {
+                        StateAndTimers updated = new 
StateAndTimers(stateInternals.getState(),
+                            timerInternals.getTimers());
+                        // persist Spark's state by outputting.
+                        return new Tuple2<>(key, new Tuple2<>(updated, 
outputs));
+                      }
+                      // an empty state with no output, can be evicted 
completely - do nothing.
+                    }
+                    return endOfData();
+                  }
+        };
+
+        // log if there's something to log.
+        long lateDropped = droppedDueToLateness.getSum();
+        if (lateDropped > 0) {
+          LOG.info(String.format("Dropped %d elements due to lateness.", 
lateDropped));
+          droppedDueToLateness.zero();
+        }
+        long closedWindowDropped = droppedDueToClosedWindow.getSum();
+        if (closedWindowDropped > 0) {
+          LOG.info(String.format("Dropped %d elements due to closed window.", 
closedWindowDropped));
+          droppedDueToClosedWindow.zero();
+        }
+
+        return scala.collection.JavaConversions.asScalaIterator(outIter);
+      }
+    }, partitioner, true, JavaSparkContext$.MODULE$.<Tuple2<StateAndTimers,
+        List<WindowedValue<KV<K, Iterable<InputT>>>>>>fakeClassTag())
+            .checkpoint(new Duration(checkpointDuration));
+
+    // go back to Java now.
+    JavaPairDStream<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, 
Iterable<InputT>>>>>>
+        javaFiredStream = JavaPairDStream.fromPairDStream(
+            firedStream,
+            JavaSparkContext$.MODULE$.<K>fakeClassTag(),
+            JavaSparkContext$.MODULE$.<Tuple2<StateAndTimers,
+                List<WindowedValue<KV<K, Iterable<InputT>>>>>>fakeClassTag());
+
+    // filter state-only output (nothing to fire) and remove the state from 
the output.
+    return javaFiredStream.filter(
+        new Function<Tuple2<K, Tuple2<StateAndTimers,
+            List<WindowedValue<KV<K, Iterable<InputT>>>>>>, Boolean>() {
+              @Override
+              public Boolean call(
+                  Tuple2<K, Tuple2<StateAndTimers,
+                  List<WindowedValue<KV<K, Iterable<InputT>>>>>> t2) throws 
Exception {
+                // filter output if defined.
+                return !t2._2()._2().isEmpty();
+              }
+        })
+        .flatMap(
+            new FlatMapFunction<Tuple2<K, Tuple2<StateAndTimers,
+                List<WindowedValue<KV<K, Iterable<InputT>>>>>>,
+                WindowedValue<KV<K, Iterable<InputT>>>>() {
+              @Override
+              public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call(
+                  Tuple2<K, Tuple2<StateAndTimers,
+                  List<WindowedValue<KV<K, Iterable<InputT>>>>>> t2) throws 
Exception {
+                // drop the state since it is already persisted at this point.
+                return t2._2()._2();
+              }
+        });
+  }
+
+  private static class StateAndTimers {
+    //Serializable state for internals (namespace to state tag to coded value).
+    private final Table<String, String, byte[]> state;
+    private final Collection<TimerData> timers;
+
+    private StateAndTimers(
+        Table<String, String, byte[]> state, Collection<TimerData> timers) {
+      this.state = state;
+      this.timers = timers;
+    }
+
+    public Table<String, String, byte[]> getState() {
+      return state;
+    }
+
+    public Collection<TimerData> getTimers() {
+      return timers;
+    }
+  }
+
+  private static class OutputWindowedValueHolder<K, V>
+      implements OutputWindowedValue<KV<K, Iterable<V>>> {
+    private List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new 
ArrayList<>();
+
+    @Override
+    public void outputWindowedValue(
+        KV<K, Iterable<V>> output,
+        Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      windowedValues.add(WindowedValue.of(output, timestamp, windows, pane));
+    }
+
+    private List<WindowedValue<KV<K, Iterable<V>>>> get() {
+      return windowedValues;
+    }
+
+    private void clear() {
+      windowedValues.clear();
+    }
+
+    @Override
+    public <SideOutputT> void sideOutputWindowedValue(
+        TupleTag<SideOutputT> tag,
+        SideOutputT output, Instant timestamp,
+        Collection<? extends BoundedWindow> windows,
+        PaneInfo pane) {
+      throw new UnsupportedOperationException("Side outputs are not allowed in 
GroupAlsoByWindow.");
+    }
+  }
+
+  private static class InMemoryLongSumAggregator implements Aggregator<Long, 
Long> {
+    private final String name;
+    private long sum = 0;
+
+    public void zero() {
+      sum = 0;
+    }
+
+    public long getSum() {
+      return sum;
+    }
+
+    InMemoryLongSumAggregator(String name) {
+      this.name = name;
+    }
+
+    @Override
+    public void addValue(Long value) {
+      sum += value;
+    }
+
+    @Override
+    public String getName() {
+      return name;
+    }
+
+    @Override
+    public Combine.CombineFn<Long, ?, Long> getCombineFn() {
+      return Sum.ofLongs();
+    }
+  }
+}

Reply via email to