Repository: beam Updated Branches: refs/heads/master a41afdc68 -> 61e31e622
Implementation of GroupAlsoByWindowViaWindowSet for the Spark runner. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/32a9d61e Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/32a9d61e Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/32a9d61e Branch: refs/heads/master Commit: 32a9d61eff79d8456f12a57c711e2b288f9c775b Parents: bf0c119 Author: Sela <ans...@paypal.com> Authored: Mon Feb 13 16:32:21 2017 +0200 Committer: Sela <ans...@paypal.com> Committed: Wed Mar 1 00:17:58 2017 +0200 ---------------------------------------------------------------------- .../SparkGroupAlsoByWindowViaWindowSet.java | 393 +++++++++++++++++++ 1 file changed, 393 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/32a9d61e/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java ---------------------------------------------------------------------- diff --git a/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java new file mode 100644 index 0000000..7902d7c --- /dev/null +++ b/runners/spark/src/main/java/org/apache/beam/runners/spark/stateful/SparkGroupAlsoByWindowViaWindowSet.java @@ -0,0 +1,393 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.spark.stateful; + +import com.google.common.collect.Table; +import com.google.common.collect.AbstractIterator; +import java.io.Serializable; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import org.apache.beam.runners.core.GroupAlsoByWindowsDoFn; +import org.apache.beam.runners.core.OutputWindowedValue; +import org.apache.beam.runners.core.ReduceFnRunner; +import org.apache.beam.runners.core.SystemReduceFn; +import org.apache.beam.runners.core.TimerInternals.TimerData; +import org.apache.beam.runners.core.triggers.ExecutableTriggerStateMachine; +import org.apache.beam.runners.core.triggers.TriggerStateMachines; +import org.apache.beam.runners.spark.SparkPipelineOptions; +import org.apache.beam.runners.spark.translation.SparkRuntimeContext; +import org.apache.beam.runners.spark.translation.TranslationUtils; +import org.apache.beam.runners.spark.translation.WindowingHelpers; +import org.apache.beam.runners.spark.util.GlobalWatermarkHolder; +import org.apache.beam.runners.spark.util.LateDataUtils; +import org.apache.beam.runners.spark.util.UnsupportedSideInputReader; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.transforms.Aggregator; +import org.apache.beam.sdk.transforms.Combine; +import org.apache.beam.sdk.transforms.Sum; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaSparkContext$; +import org.apache.spark.api.java.function.FlatMapFunction; +import org.apache.spark.api.java.function.Function; +import org.apache.spark.streaming.Duration; +import org.apache.spark.streaming.api.java.JavaDStream; +import org.apache.spark.streaming.api.java.JavaPairDStream; +import org.apache.spark.streaming.dstream.DStream; +import org.apache.spark.streaming.dstream.PairDStreamFunctions; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import scala.Function1; +import scala.Option; +import scala.Tuple2; +import scala.Tuple3; +import scala.collection.Seq; +import scala.reflect.ClassTag; +import scala.runtime.AbstractFunction1; + +/** + * An implementation of {@link org.apache.beam.runners.core.GroupAlsoByWindowViaWindowSetDoFn} + * logic for grouping by windows and controlling trigger firings and pane accumulation. + * + * <p>This implementation is a composite of Spark transformations revolving around state management + * using Spark's + * {@link PairDStreamFunctions#updateStateByKey(Function1, Partitioner, boolean, ClassTag)} + * to update state with new data and timers. + * + * <p>Using updateStateByKey allows to scan through the entire state visiting not just the + * updated state (new values for key) but also check if timers are ready to fire. + * Since updateStateByKey bounds the types of state and output to be the same, + * a (state, output) tuple is used, filtering the state (and output if no firing) + * in the following steps. + */ +public class SparkGroupAlsoByWindowViaWindowSet { + private static final Logger LOG = LoggerFactory.getLogger(SparkGroupAlsoByWindowViaWindowSet.class); + + /** + * A helper class that is essentially a {@link Serializable} {@link AbstractFunction1}. + */ + private abstract static class SerializableFunction1<T1, T2> + extends AbstractFunction1<T1, T2> implements Serializable { + } + + public static <K, InputT, W extends BoundedWindow> + JavaDStream<WindowedValue<KV<K, Iterable<InputT>>>> groupAlsoByWindow( + JavaDStream<WindowedValue<KV<K, Iterable<WindowedValue<InputT>>>>> inputDStream, + final Coder<InputT> iCoder, + final WindowingStrategy<?, W> windowingStrategy, + final SparkRuntimeContext runtimeContext, + final List<Integer> sourceIds) { + + Long checkpointDuration = + runtimeContext.getPipelineOptions().as(SparkPipelineOptions.class) + .getCheckpointDurationMillis(); + + // we have to switch to Scala API to avoid Optional in the Java API, see: SPARK-4819. + // we also have a broader API for Scala (access to the actual key and entire iterator). + DStream<Tuple2<K, Iterable<WindowedValue<InputT>>>> pairDStream = + inputDStream + .map(WindowingHelpers.<KV<K, Iterable<WindowedValue<InputT>>>>unwindowFunction()) + .mapToPair(TranslationUtils.<K, Iterable<WindowedValue<InputT>>>toPairFunction()) + .dstream(); + PairDStreamFunctions<K, Iterable<WindowedValue<InputT>>> pairDStreamFunctions = + DStream.toPairDStreamFunctions( + pairDStream, + JavaSparkContext$.MODULE$.<K>fakeClassTag(), + JavaSparkContext$.MODULE$.<Iterable<WindowedValue<InputT>>>fakeClassTag(), + null); + int defaultNumPartitions = pairDStreamFunctions.defaultPartitioner$default$1(); + Partitioner partitioner = pairDStreamFunctions.defaultPartitioner(defaultNumPartitions); + + // use updateStateByKey to scan through the state and update elements and timers. + DStream<Tuple2<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>>> + firedStream = pairDStreamFunctions.updateStateByKey( + new SerializableFunction1< + scala.collection.Iterator<Tuple3<K, Seq<Iterable<WindowedValue<InputT>>>, + Option<Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>>>>, + scala.collection.Iterator<Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>>>() { + + @Override + public scala.collection.Iterator<Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>> apply( + final scala.collection.Iterator<Tuple3<K, Seq<Iterable<WindowedValue<InputT>>>, + Option<Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>>>> iter) { + //--- ACTUAL STATEFUL OPERATION: + // + // Input Iterator: the partition (~bundle) of a cogrouping of the input + // and the previous state (if exists). + // + // Output Iterator: the output key, and the updated state. + // + // possible input scenarios for (K, Seq, Option<S>): + // (1) Option<S>.isEmpty: new data with no previous state. + // (2) Seq.isEmpty: no new data, but evaluating previous state (timer-like behaviour). + // (3) Seq.nonEmpty && Option<S>.isDefined: new data with previous state. + + final SystemReduceFn<K, InputT, Iterable<InputT>, Iterable<InputT>, W> reduceFn = + SystemReduceFn.buffering(iCoder); + final OutputWindowedValueHolder<K, InputT> outputHolder = + new OutputWindowedValueHolder<>(); + // use in memory Aggregators since Spark Accumulators are not resilient + // in stateful operators, once done with this partition. + final InMemoryLongSumAggregator droppedDueToClosedWindow = new InMemoryLongSumAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_CLOSED_WINDOW_COUNTER); + final InMemoryLongSumAggregator droppedDueToLateness = new InMemoryLongSumAggregator( + GroupAlsoByWindowsDoFn.DROPPED_DUE_TO_LATENESS_COUNTER); + + AbstractIterator< + Tuple2<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>>> + outIter = new AbstractIterator<Tuple2<K, + Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>>>() { + @Override + protected Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>> computeNext() { + // input iterator is a Spark partition (~bundle), containing keys and their + // (possibly) previous-state and (possibly) new data. + while (iter.hasNext()) { + // for each element in the partition: + Tuple3<K, Seq<Iterable<WindowedValue<InputT>>>, Option<Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>> next = iter.next(); + K key = next._1(); + + Seq<Iterable<WindowedValue<InputT>>> seq = next._2(); + + Option<Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>> + prevStateAndTimersOpt = next._3(); + + SparkStateInternals<K> stateInternals; + SparkTimerInternals timerInternals = SparkTimerInternals.forStreamFromSources( + sourceIds, GlobalWatermarkHolder.get()); + // get state(internals) per key. + if (prevStateAndTimersOpt.isEmpty()) { + // no previous state. + stateInternals = SparkStateInternals.forKey(key); + } else { + // with pre-existing state. + StateAndTimers prevStateAndTimers = prevStateAndTimersOpt.get()._1(); + stateInternals = SparkStateInternals.forKeyAndState(key, + prevStateAndTimers.getState()); + timerInternals.addTimers(prevStateAndTimers.getTimers()); + } + + ReduceFnRunner<K, InputT, Iterable<InputT>, W> reduceFnRunner = + new ReduceFnRunner<>( + key, + windowingStrategy, + ExecutableTriggerStateMachine.create( + TriggerStateMachines.stateMachineForTrigger( + windowingStrategy.getTrigger())), + stateInternals, + timerInternals, + outputHolder, + new UnsupportedSideInputReader("GroupAlsoByWindow"), + droppedDueToClosedWindow, + reduceFn, + runtimeContext.getPipelineOptions()); + + outputHolder.clear(); // clear before potential use. + if (!seq.isEmpty()) { + // new input for key. + try { + Iterable<WindowedValue<InputT>> elementsIterable = seq.head(); + Iterable<WindowedValue<InputT>> validElements = + LateDataUtils + .dropExpiredWindows( + key, + elementsIterable, + timerInternals, + windowingStrategy, + droppedDueToLateness); + reduceFnRunner.processElements(validElements); + } catch (Exception e) { + throw new RuntimeException( + "Failed to process element with ReduceFnRunner", e); + } + } else if (stateInternals.getState().isEmpty()) { + // no input and no state -> GC evict now. + continue; + } + try { + // advance the watermark to HWM to fire by timers. + timerInternals.advanceWatermark(); + // call on timers that are ready. + reduceFnRunner.onTimers(timerInternals.getTimersReadyToProcess()); + } catch (Exception e) { + throw new RuntimeException( + "Failed to process ReduceFnRunner onTimer.", e); + } + // this is mostly symbolic since actual persist is done by emitting output. + reduceFnRunner.persist(); + // obtain output, if fired. + List<WindowedValue<KV<K, Iterable<InputT>>>> outputs = outputHolder.get(); + if (!outputs.isEmpty() || !stateInternals.getState().isEmpty()) { + StateAndTimers updated = new StateAndTimers(stateInternals.getState(), + timerInternals.getTimers()); + // persist Spark's state by outputting. + return new Tuple2<>(key, new Tuple2<>(updated, outputs)); + } + // an empty state with no output, can be evicted completely - do nothing. + } + return endOfData(); + } + }; + + // log if there's something to log. + long lateDropped = droppedDueToLateness.getSum(); + if (lateDropped > 0) { + LOG.info(String.format("Dropped %d elements due to lateness.", lateDropped)); + droppedDueToLateness.zero(); + } + long closedWindowDropped = droppedDueToClosedWindow.getSum(); + if (closedWindowDropped > 0) { + LOG.info(String.format("Dropped %d elements due to closed window.", closedWindowDropped)); + droppedDueToClosedWindow.zero(); + } + + return scala.collection.JavaConversions.asScalaIterator(outIter); + } + }, partitioner, true, JavaSparkContext$.MODULE$.<Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>fakeClassTag()) + .checkpoint(new Duration(checkpointDuration)); + + // go back to Java now. + JavaPairDStream<K, Tuple2<StateAndTimers, List<WindowedValue<KV<K, Iterable<InputT>>>>>> + javaFiredStream = JavaPairDStream.fromPairDStream( + firedStream, + JavaSparkContext$.MODULE$.<K>fakeClassTag(), + JavaSparkContext$.MODULE$.<Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>fakeClassTag()); + + // filter state-only output (nothing to fire) and remove the state from the output. + return javaFiredStream.filter( + new Function<Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>, Boolean>() { + @Override + public Boolean call( + Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>> t2) throws Exception { + // filter output if defined. + return !t2._2()._2().isEmpty(); + } + }) + .flatMap( + new FlatMapFunction<Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>>, + WindowedValue<KV<K, Iterable<InputT>>>>() { + @Override + public Iterable<WindowedValue<KV<K, Iterable<InputT>>>> call( + Tuple2<K, Tuple2<StateAndTimers, + List<WindowedValue<KV<K, Iterable<InputT>>>>>> t2) throws Exception { + // drop the state since it is already persisted at this point. + return t2._2()._2(); + } + }); + } + + private static class StateAndTimers { + //Serializable state for internals (namespace to state tag to coded value). + private final Table<String, String, byte[]> state; + private final Collection<TimerData> timers; + + private StateAndTimers( + Table<String, String, byte[]> state, Collection<TimerData> timers) { + this.state = state; + this.timers = timers; + } + + public Table<String, String, byte[]> getState() { + return state; + } + + public Collection<TimerData> getTimers() { + return timers; + } + } + + private static class OutputWindowedValueHolder<K, V> + implements OutputWindowedValue<KV<K, Iterable<V>>> { + private List<WindowedValue<KV<K, Iterable<V>>>> windowedValues = new ArrayList<>(); + + @Override + public void outputWindowedValue( + KV<K, Iterable<V>> output, + Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + windowedValues.add(WindowedValue.of(output, timestamp, windows, pane)); + } + + private List<WindowedValue<KV<K, Iterable<V>>>> get() { + return windowedValues; + } + + private void clear() { + windowedValues.clear(); + } + + @Override + public <SideOutputT> void sideOutputWindowedValue( + TupleTag<SideOutputT> tag, + SideOutputT output, Instant timestamp, + Collection<? extends BoundedWindow> windows, + PaneInfo pane) { + throw new UnsupportedOperationException("Side outputs are not allowed in GroupAlsoByWindow."); + } + } + + private static class InMemoryLongSumAggregator implements Aggregator<Long, Long> { + private final String name; + private long sum = 0; + + public void zero() { + sum = 0; + } + + public long getSum() { + return sum; + } + + InMemoryLongSumAggregator(String name) { + this.name = name; + } + + @Override + public void addValue(Long value) { + sum += value; + } + + @Override + public String getName() { + return name; + } + + @Override + public Combine.CombineFn<Long, ?, Long> getCombineFn() { + return Sum.ofLongs(); + } + } +}