Repository: beam Updated Branches: refs/heads/master 75728a40c -> 3178f07b9
[BEAM-1772] Support merging WindowFn other than IntervalWindow on Flink Runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/d4a9f60d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/d4a9f60d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/d4a9f60d Branch: refs/heads/master Commit: d4a9f60df15f22f30869a1e19493b32de2506049 Parents: 75728a4 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Mon Mar 27 02:09:32 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Tue Apr 25 15:58:56 2017 +0200 ---------------------------------------------------------------------- .../flink/FlinkBatchTransformTranslators.java | 78 ++----- .../functions/AbstractFlinkCombineRunner.java | 182 +++++++++++++++++ .../FlinkMergingNonShuffleReduceFunction.java | 165 ++------------- .../FlinkMergingPartialReduceFunction.java | 201 ------------------- .../functions/FlinkMergingReduceFunction.java | 199 ------------------ .../functions/FlinkPartialReduceFunction.java | 114 ++--------- .../functions/FlinkReduceFunction.java | 110 ++-------- .../functions/HashingFlinkCombineRunner.java | 173 ++++++++++++++++ .../functions/SortingFlinkCombineRunner.java | 185 +++++++++++++++++ 9 files changed, 611 insertions(+), 796 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java index cb33fc1..99de5be 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkBatchTransformTranslators.java @@ -31,8 +31,6 @@ import java.util.Map.Entry; import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.functions.FlinkDoFnFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMergingNonShuffleReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMergingPartialReduceFunction; -import org.apache.beam.runners.flink.translation.functions.FlinkMergingReduceFunction; import org.apache.beam.runners.flink.translation.functions.FlinkMultiOutputPruningFunction; import org.apache.beam.runners.flink.translation.functions.FlinkPartialReduceFunction; import org.apache.beam.runners.flink.translation.functions.FlinkReduceFunction; @@ -62,7 +60,6 @@ import org.apache.beam.sdk.transforms.reflect.DoFnSignature; import org.apache.beam.sdk.transforms.reflect.DoFnSignatures; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.GlobalWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.transforms.windowing.Window; import org.apache.beam.sdk.transforms.windowing.WindowFn; import org.apache.beam.sdk.util.Reshuffle; @@ -75,6 +72,7 @@ import org.apache.beam.sdk.values.PValue; import org.apache.beam.sdk.values.TupleTag; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.operators.DataSource; @@ -221,48 +219,21 @@ class FlinkBatchTransformTranslators { Grouping<WindowedValue<KV<K, InputT>>> inputGrouping = inputDataSet.groupBy(new KvKeySelector<InputT, K>(inputCoder.getKeyCoder())); - FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction; - FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction; - - if (windowingStrategy.getWindowFn().isNonMerging()) { - @SuppressWarnings("unchecked") - WindowingStrategy<?, BoundedWindow> boundedStrategy = - (WindowingStrategy<?, BoundedWindow>) windowingStrategy; - - partialReduceFunction = new FlinkPartialReduceFunction<>( - combineFn, - boundedStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - - reduceFunction = new FlinkReduceFunction<>( - combineFn, - boundedStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); + @SuppressWarnings("unchecked") + WindowingStrategy<Object, BoundedWindow> boundedStrategy = + (WindowingStrategy<Object, BoundedWindow>) windowingStrategy; - } else { - if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { - throw new UnsupportedOperationException( - "Merging WindowFn with windows other than IntervalWindow are not supported."); - } + FlinkPartialReduceFunction<K, InputT, List<InputT>, ?> partialReduceFunction = + new FlinkPartialReduceFunction<>( + combineFn, boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); - @SuppressWarnings("unchecked") - WindowingStrategy<?, IntervalWindow> intervalStrategy = - (WindowingStrategy<?, IntervalWindow>) windowingStrategy; - - partialReduceFunction = new FlinkMergingPartialReduceFunction<>( - combineFn, - intervalStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - - reduceFunction = new FlinkMergingReduceFunction<>( - combineFn, - intervalStrategy, - Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), - context.getPipelineOptions()); - } + FlinkReduceFunction<K, List<InputT>, List<InputT>, ?> reduceFunction = + new FlinkReduceFunction<>( + combineFn, boundedStrategy, + Collections.<PCollectionView<?>, WindowingStrategy<?, ?>>emptyMap(), + context.getPipelineOptions()); // Partially GroupReduce the values into the intermediate format AccumT (combine) GroupCombineOperator< @@ -402,9 +373,10 @@ class FlinkBatchTransformTranslators { sideInputStrategies.put(sideInput, sideInput.getWindowingStrategyInternal()); } + WindowingStrategy<Object, BoundedWindow> boundedStrategy = + (WindowingStrategy<Object, BoundedWindow>) windowingStrategy; + if (windowingStrategy.getWindowFn().isNonMerging()) { - WindowingStrategy<?, BoundedWindow> boundedStrategy = - (WindowingStrategy<?, BoundedWindow>) windowingStrategy; FlinkPartialReduceFunction<K, InputT, AccumT, ?> partialReduceFunction = new FlinkPartialReduceFunction<>( @@ -449,23 +421,13 @@ class FlinkBatchTransformTranslators { context.setOutputDataSet(context.getOutput(transform), outputDataSet); } else { - if (!windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { - throw new UnsupportedOperationException( - "Merging WindowFn with windows other than IntervalWindow are not supported."); - } // for merging windows we can't to a pre-shuffle combine step since // elements would not be in their correct windows for side-input access - WindowingStrategy<?, IntervalWindow> intervalStrategy = - (WindowingStrategy<?, IntervalWindow>) windowingStrategy; - - FlinkMergingNonShuffleReduceFunction<K, InputT, AccumT, OutputT, ?> reduceFunction = - new FlinkMergingNonShuffleReduceFunction<>( - combineFn, - intervalStrategy, - sideInputStrategies, - context.getPipelineOptions()); + RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> + reduceFunction = new FlinkMergingNonShuffleReduceFunction<>( + combineFn, boundedStrategy, sideInputStrategies, context.getPipelineOptions()); TypeInformation<WindowedValue<KV<K, OutputT>>> reduceTypeInfo = context.getTypeInfo(context.getOutput(transform)); http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java new file mode 100644 index 0000000..83ff70d --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/AbstractFlinkCombineRunner.java @@ -0,0 +1,182 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.ImmutableList; +import java.util.Collection; +import org.apache.beam.runners.core.PerKeyCombineFnRunner; +import org.apache.beam.runners.core.PerKeyCombineFnRunners; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.CombineFnBase; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.util.Collector; + +/** + * Abstract base for runners that execute a {@link org.apache.beam.sdk.transforms.Combine.PerKey}. + * This unifies processing of merging/non-merging and partial/final combines. + * + * <p>The input to {@link #combine( + * FlinkCombiner, WindowingStrategy, SideInputReader, PipelineOptions, Iterable, Collector)} are + * elements of the same key but * for different windows. + */ +public abstract class AbstractFlinkCombineRunner< + K, InputT, AccumT, OutputT, W extends BoundedWindow> { + + /** + * Consumes {@link WindowedValue WindowedValues} and produces combined output to the given output. + */ + public abstract void combine( + FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, + WindowingStrategy<Object, W> windowingStrategy, + SideInputReader sideInputReader, + PipelineOptions options, + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception; + + /** + * Adapter interface that allows using a {@link CombineFnBase.PerKeyCombineFn} to either produce + * the {@code AccumT} as output or to combine several accumulators into an {@code OutputT}. + * The former would be used for a partial combine while the latter is used for the final merging + * of accumulators. + */ + public interface FlinkCombiner<K, InputT, AccumT, OutputT>{ + + AccumT firstInput(K key, InputT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + AccumT addInput(K key, AccumT accumulator, InputT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows); + + OutputT extractOutput(K key, AccumT accumulator, PipelineOptions options, + SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows); + } + + /** + * A straight wrapper of {@link CombineFnBase.PerKeyCombineFn} that takes in {@code InputT} + * and emits {@code OutputT}. + */ + public static class CompleteFlinkCombiner<K, InputT, AccumT, OutputT> implements + FlinkCombiner<K, InputT, AccumT, OutputT> { + + private final PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner; + + public CompleteFlinkCombiner( + CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn) { + combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + } + + @Override + public AccumT firstInput( + K key, InputT value, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + AccumT accumulator = + combineFnRunner.createAccumulator(key, options, sideInputReader, windows); + return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + } + + @Override + public OutputT extractOutput( + K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows); + } + } + + /** + * A partial combiner that takes in {@code InputT} and produces {@code AccumT}. + */ + public static class PartialFlinkCombiner<K, InputT, AccumT> implements + FlinkCombiner<K, InputT, AccumT, AccumT> { + + private final PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner; + + public PartialFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn) { + combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + } + + @Override + public AccumT firstInput( + K key, InputT value, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + AccumT accumulator = + combineFnRunner.createAccumulator(key, options, sideInputReader, windows); + return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, InputT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return combineFnRunner.addInput(key, accumulator, value, options, sideInputReader, windows); + } + + @Override + public AccumT extractOutput( + K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return accumulator; + } + } + + /** + * A final combiner that takes in {@code AccumT} and produces {@code OutputT}. + */ + public static class FinalFlinkCombiner<K, AccumT, OutputT> implements + FlinkCombiner<K, AccumT, AccumT, OutputT> { + + private final PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner; + + public FinalFlinkCombiner(CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn) { + combineFnRunner = PerKeyCombineFnRunners.create(combineFn); + } + + @Override + public AccumT firstInput( + K key, AccumT value, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return value; + } + + @Override + public AccumT addInput( + K key, AccumT accumulator, AccumT value, PipelineOptions options, + SideInputReader sideInputReader, Collection<? extends BoundedWindow> windows) { + return combineFnRunner.mergeAccumulators( + key, ImmutableList.of(accumulator, value), options, sideInputReader, windows); + } + + @Override + public OutputT extractOutput( + K key, AccumT accumulator, PipelineOptions options, SideInputReader sideInputReader, + Collection<? extends BoundedWindow> windows) { + return combineFnRunner.extractOutput(key, accumulator, options, sideInputReader, windows); + } + } +} http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java index 26fd0b4..3712598 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingNonShuffleReduceFunction.java @@ -17,46 +17,33 @@ */ package org.apache.beam.runners.flink.translation.functions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.util.Collector; -import org.joda.time.Instant; /** - * Special version of {@link FlinkReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. + * Special version of {@link FlinkReduceFunction} that supports merging windows. * * <p>This is different from the pair of function for the non-merging windows case * in that we cannot do combining before the shuffle because elements would not * yet be in their correct windows for side-input access. */ public class FlinkMergingNonShuffleReduceFunction< - K, InputT, AccumT, OutputT, W extends IntervalWindow> + K, InputT, AccumT, OutputT, W extends BoundedWindow> extends RichGroupReduceFunction<WindowedValue<KV<K, InputT>>, WindowedValue<KV<K, OutputT>>> { private final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> combineFn; - private final WindowingStrategy<?, W> windowingStrategy; + private final WindowingStrategy<Object, W> windowingStrategy; private final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -64,7 +51,7 @@ public class FlinkMergingNonShuffleReduceFunction< public FlinkMergingNonShuffleReduceFunction( CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, + WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { @@ -87,142 +74,22 @@ public class FlinkMergingNonShuffleReduceFunction< FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); - PerKeyCombineFnRunner<K, InputT, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = - combineFnRunner.createAccumulator(key, options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (currentWindow.equals(nextWindow)) { - // continue accumulating and merge windows - - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } + AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> reduceRunner; + if (windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + reduceRunner = new SortingFlinkCombineRunner<>(); + } else { + reduceRunner = new HashingFlinkCombineRunner<>(); } - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - } + reduceRunner.combine( + new AbstractFlinkCombineRunner.CompleteFlinkCombiner<>(combineFn), + windowingStrategy, + sideInputReader, + options, + elements, + out); - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, InputT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } } } http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java deleted file mode 100644 index c68f155..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingPartialReduceFunction.java +++ /dev/null @@ -1,201 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * Special version of {@link FlinkPartialReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. - */ -public class FlinkMergingPartialReduceFunction<K, InputT, AccumT, W extends IntervalWindow> - extends FlinkPartialReduceFunction<K, InputT, AccumT, W> { - - public FlinkMergingPartialReduceFunction( - CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - super(combineFn, windowingStrategy, sideInputs, pipelineOptions); - } - - @Override - public void combine( - Iterable<WindowedValue<KV<K, InputT>>> elements, - Collector<WindowedValue<KV<K, AccumT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - IntervalWindow nextWindow = (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (currentWindow.equals(nextWindow)) { - // continue accumulating and merge windows - - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - } - - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, InputT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, InputT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } - } -} http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java deleted file mode 100644 index 84b3adc..0000000 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkMergingReduceFunction.java +++ /dev/null @@ -1,199 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.runners.flink.translation.functions; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; -import org.apache.beam.sdk.options.PipelineOptions; -import org.apache.beam.sdk.transforms.CombineFnBase; -import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.IntervalWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; -import org.apache.beam.sdk.util.WindowedValue; -import org.apache.beam.sdk.util.WindowingStrategy; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.flink.util.Collector; -import org.joda.time.Instant; - -/** - * Special version of {@link FlinkReduceFunction} that supports merging windows. This - * assumes that the windows are {@link IntervalWindow IntervalWindows} and exhibits the - * same behaviour as {@code MergeOverlappingIntervalWindows}. - */ -public class FlinkMergingReduceFunction<K, AccumT, OutputT, W extends IntervalWindow> - extends FlinkReduceFunction<K, AccumT, OutputT, W> { - - public FlinkMergingReduceFunction( - CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, - Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, - PipelineOptions pipelineOptions) { - super(keyedCombineFn, windowingStrategy, sideInputs, pipelineOptions); - } - - @Override - public void reduce( - Iterable<WindowedValue<KV<K, AccumT>>> elements, - Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { - - PipelineOptions options = serializedOptions.getPipelineOptions(); - - FlinkSideInputReader sideInputReader = - new FlinkSideInputReader(sideInputs, getRuntimeContext()); - - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, AccumT>> inputValue : elements) { - for (WindowedValue<KV<K, AccumT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { - @Override - public int compare( - WindowedValue<KV<K, AccumT>> o1, - WindowedValue<KV<K, AccumT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // merge windows, we have to do it in an extra pre-processing step and - // can't do it as we go since the window of early elements would not - // be correct when calling the CombineFn - mergeWindow(sortedInput); - - // iterate over the elements that are sorted by window timestamp - final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); - - // get the first accumulator - WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(currentValue.getWindows()); - AccumT accumulator = currentValue.getValue().getValue(); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn, - // in FlinkPartialReduceFunction we already merge the timestamps assigned - // to individual elements, here we just merge them - List<Instant> windowTimestamps = new ArrayList<>(); - windowTimestamps.add(currentValue.getTimestamp()); - - while (iterator.hasNext()) { - WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating and merge windows - - accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), - options, sideInputReader, currentValue.getWindows()); - - windowTimestamps.add(nextValue.getTimestamp()); - } else { - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - - windowTimestamps.clear(); - - currentWindow = nextWindow; - currentValue = nextValue; - accumulator = nextValue.getValue().getValue(); - windowTimestamps.add(nextValue.getTimestamp()); - } - } - - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - } - - /** - * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. - * This replaces windows in the input list. - */ - private void mergeWindow(List<WindowedValue<KV<K, AccumT>>> elements) { - int currentStart = 0; - IntervalWindow currentWindow = - (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); - - for (int i = 1; i < elements.size(); i++) { - WindowedValue<KV<K, AccumT>> nextValue = elements.get(i); - IntervalWindow nextWindow = - (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); - if (currentWindow.intersects(nextWindow)) { - // we continue - currentWindow = currentWindow.span(nextWindow); - } else { - // retrofit the merged window to all windows up to "currentStart" - for (int j = i - 1; j >= currentStart; j--) { - WindowedValue<KV<K, AccumT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - currentStart = i; - currentWindow = nextWindow; - } - } - if (currentStart < elements.size() - 1) { - // we have to retrofit the last batch - for (int j = elements.size() - 1; j >= currentStart; j--) { - WindowedValue<KV<K, AccumT>> value = elements.get(j); - elements.set( - j, - WindowedValue.of( - value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java index 1d1ff9f..9a44840 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkPartialReduceFunction.java @@ -17,28 +17,18 @@ */ package org.apache.beam.runners.flink.translation.functions; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.flink.api.common.functions.RichGroupCombineFunction; import org.apache.flink.util.Collector; -import org.joda.time.Instant; /** * This is is the first step for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} @@ -54,7 +44,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind protected final CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn; - protected final WindowingStrategy<?, W> windowingStrategy; + protected final WindowingStrategy<Object, W> windowingStrategy; protected final SerializedPipelineOptions serializedOptions; @@ -62,7 +52,7 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind public FlinkPartialReduceFunction( CombineFnBase.PerKeyCombineFn<K, InputT, AccumT, ?> combineFn, - WindowingStrategy<?, W> windowingStrategy, + WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { @@ -83,90 +73,22 @@ public class FlinkPartialReduceFunction<K, InputT, AccumT, W extends BoundedWind FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); - PerKeyCombineFnRunner<K, InputT, AccumT, ?> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); - - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, InputT>> inputValue : elements) { - for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { - sortedInput.add(exploded); - } - } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { - @Override - public int compare( - WindowedValue<KV<K, InputT>> o1, - WindowedValue<KV<K, InputT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // iterate over the elements that are sorted by window timestamp - // - final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); - - // create accumulator using the first elements key - WindowedValue<KV<K, InputT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); - InputT firstValue = currentValue.getValue().getValue(); - AccumT accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, firstValue, - options, sideInputReader, currentValue.getWindows()); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn - Instant windowTimestamp = - outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); - - while (iterator.hasNext()) { - WindowedValue<KV<K, InputT>> nextValue = iterator.next(); - BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - - windowTimestamp = outputTimeFn.combine( - windowTimestamp, - outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); - - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); - - currentWindow = nextWindow; - currentValue = nextValue; - InputT value = nextValue.getValue().getValue(); - accumulator = combineFnRunner.createAccumulator(key, - options, sideInputReader, currentValue.getWindows()); - accumulator = combineFnRunner.addInput(key, accumulator, value, - options, sideInputReader, currentValue.getWindows()); - windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); - } + AbstractFlinkCombineRunner<K, InputT, AccumT, AccumT, W> reduceRunner; + + if (!windowingStrategy.getWindowFn().isNonMerging() + && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + reduceRunner = new HashingFlinkCombineRunner<>(); + } else { + reduceRunner = new SortingFlinkCombineRunner<>(); } - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, accumulator), - windowTimestamp, - currentWindow, - PaneInfo.NO_FIRING)); + reduceRunner.combine( + new AbstractFlinkCombineRunner.PartialFlinkCombiner<>(combineFn), + windowingStrategy, + sideInputReader, + options, + elements, + out); + } } http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java index 3e4f742..6c1a2e4 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/FlinkReduceFunction.java @@ -17,30 +17,18 @@ */ package org.apache.beam.runners.flink.translation.functions; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.Iterator; -import java.util.List; import java.util.Map; -import org.apache.beam.runners.core.PerKeyCombineFnRunner; -import org.apache.beam.runners.core.PerKeyCombineFnRunners; import org.apache.beam.runners.flink.translation.utils.SerializedPipelineOptions; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.transforms.CombineFnBase; import org.apache.beam.sdk.transforms.windowing.BoundedWindow; -import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; -import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; import org.apache.beam.sdk.util.WindowedValue; import org.apache.beam.sdk.util.WindowingStrategy; import org.apache.beam.sdk.values.KV; import org.apache.beam.sdk.values.PCollectionView; import org.apache.flink.api.common.functions.RichGroupReduceFunction; import org.apache.flink.util.Collector; -import org.joda.time.Instant; /** * This is the second part for executing a {@link org.apache.beam.sdk.transforms.Combine.PerKey} @@ -56,7 +44,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> protected final CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> combineFn; - protected final WindowingStrategy<?, W> windowingStrategy; + protected final WindowingStrategy<Object, W> windowingStrategy; protected final Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs; @@ -64,7 +52,7 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> public FlinkReduceFunction( CombineFnBase.PerKeyCombineFn<K, ?, AccumT, OutputT> keyedCombineFn, - WindowingStrategy<?, W> windowingStrategy, + WindowingStrategy<Object, W> windowingStrategy, Map<PCollectionView<?>, WindowingStrategy<?, ?>> sideInputs, PipelineOptions pipelineOptions) { @@ -87,87 +75,23 @@ public class FlinkReduceFunction<K, AccumT, OutputT, W extends BoundedWindow> FlinkSideInputReader sideInputReader = new FlinkSideInputReader(sideInputs, getRuntimeContext()); - PerKeyCombineFnRunner<K, ?, AccumT, OutputT> combineFnRunner = - PerKeyCombineFnRunners.create(combineFn); + AbstractFlinkCombineRunner<K, AccumT, AccumT, OutputT, W> reduceRunner; - @SuppressWarnings("unchecked") - OutputTimeFn<? super BoundedWindow> outputTimeFn = - (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); - - - // get all elements so that we can sort them, has to fit into - // memory - // this seems very unprudent, but correct, for now - ArrayList<WindowedValue<KV<K, AccumT>>> sortedInput = Lists.newArrayList(); - for (WindowedValue<KV<K, AccumT>> inputValue: elements) { - for (WindowedValue<KV<K, AccumT>> exploded: inputValue.explodeWindows()) { - sortedInput.add(exploded); - } + if (!windowingStrategy.getWindowFn().isNonMerging() + && !windowingStrategy.getWindowFn().windowCoder().equals(IntervalWindow.getCoder())) { + reduceRunner = new HashingFlinkCombineRunner<>(); + } else { + reduceRunner = new SortingFlinkCombineRunner<>(); } - Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, AccumT>>>() { - @Override - public int compare( - WindowedValue<KV<K, AccumT>> o1, - WindowedValue<KV<K, AccumT>> o2) { - return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() - .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); - } - }); - - // iterate over the elements that are sorted by window timestamp - // - final Iterator<WindowedValue<KV<K, AccumT>>> iterator = sortedInput.iterator(); - - // get the first accumulator - WindowedValue<KV<K, AccumT>> currentValue = iterator.next(); - K key = currentValue.getValue().getKey(); - BoundedWindow currentWindow = Iterables.getFirst(currentValue.getWindows(), null); - AccumT accumulator = currentValue.getValue().getValue(); - - // we use this to keep track of the timestamps assigned by the OutputTimeFn, - // in FlinkPartialReduceFunction we already merge the timestamps assigned - // to individual elements, here we just merge them - List<Instant> windowTimestamps = new ArrayList<>(); - windowTimestamps.add(currentValue.getTimestamp()); - - while (iterator.hasNext()) { - WindowedValue<KV<K, AccumT>> nextValue = iterator.next(); - BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); - - if (nextWindow.equals(currentWindow)) { - // continue accumulating - accumulator = combineFnRunner.mergeAccumulators( - key, ImmutableList.of(accumulator, nextValue.getValue().getValue()), - options, sideInputReader, currentValue.getWindows()); - - windowTimestamps.add(nextValue.getTimestamp()); - } else { - // emit the value that we currently have - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); - - windowTimestamps.clear(); - - currentWindow = nextWindow; - currentValue = nextValue; - accumulator = nextValue.getValue().getValue(); - windowTimestamps.add(nextValue.getTimestamp()); - } - } + reduceRunner.combine( + new AbstractFlinkCombineRunner.FinalFlinkCombiner<>(combineFn), + windowingStrategy, + sideInputReader, + options, + elements, + out); - // emit the final accumulator - out.collect( - WindowedValue.of( - KV.of(key, combineFnRunner.extractOutput(key, accumulator, - options, sideInputReader, currentValue.getWindows())), - outputTimeFn.merge(currentWindow, windowTimestamps), - currentWindow, - PaneInfo.NO_FIRING)); } + } http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java new file mode 100644 index 0000000..b904bfe --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/HashingFlinkCombineRunner.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.Iterables; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.transforms.windowing.WindowFn; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * A Flink combine runner that builds a map of merged windows and produces output after + * seeing all input. This is similar to what{@link org.apache.beam.runners.core.ReduceFnRunner} + * does. + */ +public class HashingFlinkCombineRunner< + K, InputT, AccumT, OutputT, W extends BoundedWindow> + extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> { + + @Override + public void combine( + FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, + WindowingStrategy<Object, W> windowingStrategy, + SideInputReader sideInputReader, + PipelineOptions options, + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // Flink Iterable can be iterated over only once. + List<WindowedValue<KV<K, InputT>>> inputs = new ArrayList<>(); + Iterables.addAll(inputs, elements); + + Set<W> windows = collectWindows(inputs); + Map<W, W> windowToMergeResult = mergeWindows(windowingStrategy, windows); + + // Combine all windowedValues into map + Map<W, Tuple2<AccumT, Instant>> mapState = new HashMap<>(); + Iterator<WindowedValue<KV<K, InputT>>> iterator = inputs.iterator(); + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + do { + for (BoundedWindow w : currentValue.getWindows()) { + @SuppressWarnings("unchecked") + W currentWindow = (W) w; + W mergedWindow = windowToMergeResult.get(currentWindow); + mergedWindow = mergedWindow == null ? currentWindow : mergedWindow; + Set<W> singletonW = Collections.singleton(mergedWindow); + Tuple2<AccumT, Instant> accumAndInstant = mapState.get(mergedWindow); + if (accumAndInstant == null) { + AccumT accumT = flinkCombiner.firstInput(key, currentValue.getValue().getValue(), + options, sideInputReader, singletonW); + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow); + accumAndInstant = new Tuple2<>(accumT, windowTimestamp); + mapState.put(mergedWindow, accumAndInstant); + } else { + accumAndInstant.f0 = flinkCombiner.addInput(key, accumAndInstant.f0, + currentValue.getValue().getValue(), options, sideInputReader, singletonW); + accumAndInstant.f1 = outputTimeFn.combine(accumAndInstant.f1, + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), mergedWindow)); + } + } + if (iterator.hasNext()) { + currentValue = iterator.next(); + } else { + break; + } + } while (true); + + // Output the final value of combiners + for (Map.Entry<W, Tuple2<AccumT, Instant>> entry : mapState.entrySet()) { + AccumT accumulator = entry.getValue().f0; + Instant windowTimestamp = entry.getValue().f1; + out.collect( + WindowedValue.of( + KV.of(key, flinkCombiner.extractOutput(key, accumulator, + options, sideInputReader, Collections.singleton(entry.getKey()))), + windowTimestamp, + entry.getKey(), + PaneInfo.NO_FIRING)); + } + + } + + private Map<W, W> mergeWindows( + WindowingStrategy<Object, W> windowingStrategy, + Set<W> windows) throws Exception { + WindowFn<Object, W> windowFn = windowingStrategy.getWindowFn(); + + if (windowingStrategy.getWindowFn().isNonMerging()) { + // Return an empty map, indicating that every window is not merged. + return Collections.emptyMap(); + } + + Map<W, W> windowToMergeResult = new HashMap<>(); + windowFn.mergeWindows(new MergeContextImpl(windowFn, windows, windowToMergeResult)); + return windowToMergeResult; + } + + private class MergeContextImpl extends WindowFn<Object, W>.MergeContext { + + private Set<W> windows; + private Map<W, W> windowToMergeResult; + + MergeContextImpl(WindowFn<Object, W> windowFn, Set<W> windows, Map<W, W> windowToMergeResult) { + windowFn.super(); + this.windows = windows; + this.windowToMergeResult = windowToMergeResult; + } + + @Override + public Collection<W> windows() { + return windows; + } + + @Override + public void merge(Collection<W> toBeMerged, W mergeResult) throws Exception { + for (W w : toBeMerged) { + windowToMergeResult.put(w, mergeResult); + } + } + } + + private Set<W> collectWindows(Iterable<WindowedValue<KV<K, InputT>>> values) { + Set<W> windows = new HashSet<>(); + for (WindowedValue<?> value : values) { + for (BoundedWindow untypedWindow : value.getWindows()) { + @SuppressWarnings("unchecked") + W window = (W) untypedWindow; + windows.add(window); + } + } + return windows; + } + +} http://git-wip-us.apache.org/repos/asf/beam/blob/d4a9f60d/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java new file mode 100644 index 0000000..2967f2c --- /dev/null +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/functions/SortingFlinkCombineRunner.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.runners.flink.translation.functions; + +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; +import java.util.Collections; +import java.util.Comparator; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.options.PipelineOptions; +import org.apache.beam.sdk.transforms.windowing.BoundedWindow; +import org.apache.beam.sdk.transforms.windowing.IntervalWindow; +import org.apache.beam.sdk.transforms.windowing.OutputTimeFn; +import org.apache.beam.sdk.transforms.windowing.PaneInfo; +import org.apache.beam.sdk.util.SideInputReader; +import org.apache.beam.sdk.util.WindowedValue; +import org.apache.beam.sdk.util.WindowingStrategy; +import org.apache.beam.sdk.values.KV; +import org.apache.flink.util.Collector; +import org.joda.time.Instant; + +/** + * A Flink combine runner that first sorts the elements by window and then does one pass that + * merges windows and outputs results. + */ +public class SortingFlinkCombineRunner<K, InputT, AccumT, OutputT, W extends BoundedWindow> + extends AbstractFlinkCombineRunner<K, InputT, AccumT, OutputT, W> { + + + @Override + public void combine( + FlinkCombiner<K, InputT, AccumT, OutputT> flinkCombiner, + WindowingStrategy<Object, W> windowingStrategy, + SideInputReader sideInputReader, + PipelineOptions options, + Iterable<WindowedValue<KV<K, InputT>>> elements, + Collector<WindowedValue<KV<K, OutputT>>> out) throws Exception { + + @SuppressWarnings("unchecked") + OutputTimeFn<? super BoundedWindow> outputTimeFn = + (OutputTimeFn<? super BoundedWindow>) windowingStrategy.getOutputTimeFn(); + + // get all elements so that we can sort them, has to fit into + // memory + // this seems very unprudent, but correct, for now + List<WindowedValue<KV<K, InputT>>> sortedInput = Lists.newArrayList(); + for (WindowedValue<KV<K, InputT>> inputValue : elements) { + for (WindowedValue<KV<K, InputT>> exploded : inputValue.explodeWindows()) { + sortedInput.add(exploded); + } + } + Collections.sort(sortedInput, new Comparator<WindowedValue<KV<K, InputT>>>() { + @Override + public int compare( + WindowedValue<KV<K, InputT>> o1, + WindowedValue<KV<K, InputT>> o2) { + return Iterables.getOnlyElement(o1.getWindows()).maxTimestamp() + .compareTo(Iterables.getOnlyElement(o2.getWindows()).maxTimestamp()); + } + }); + + if (!windowingStrategy.getWindowFn().isNonMerging()) { + // merge windows, we have to do it in an extra pre-processing step and + // can't do it as we go since the window of early elements would not + // be correct when calling the CombineFn + mergeWindow(sortedInput); + } + + // iterate over the elements that are sorted by window timestamp + final Iterator<WindowedValue<KV<K, InputT>>> iterator = sortedInput.iterator(); + + // create accumulator using the first elements key + WindowedValue<KV<K, InputT>> currentValue = iterator.next(); + K key = currentValue.getValue().getKey(); + BoundedWindow currentWindow = Iterables.getOnlyElement(currentValue.getWindows()); + InputT firstValue = currentValue.getValue().getValue(); + AccumT accumulator = flinkCombiner.firstInput( + key, firstValue, options, sideInputReader, currentValue.getWindows()); + + // we use this to keep track of the timestamps assigned by the OutputTimeFn + Instant windowTimestamp = + outputTimeFn.assignOutputTime(currentValue.getTimestamp(), currentWindow); + + while (iterator.hasNext()) { + WindowedValue<KV<K, InputT>> nextValue = iterator.next(); + BoundedWindow nextWindow = Iterables.getOnlyElement(nextValue.getWindows()); + + if (currentWindow.equals(nextWindow)) { + // continue accumulating and merge windows + + InputT value = nextValue.getValue().getValue(); + accumulator = flinkCombiner.addInput(key, accumulator, value, + options, sideInputReader, currentValue.getWindows()); + + windowTimestamp = outputTimeFn.combine( + windowTimestamp, + outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow)); + + } else { + // emit the value that we currently have + out.collect( + WindowedValue.of( + KV.of(key, flinkCombiner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + + currentWindow = nextWindow; + currentValue = nextValue; + InputT value = nextValue.getValue().getValue(); + accumulator = flinkCombiner.firstInput(key, value, + options, sideInputReader, currentValue.getWindows()); + windowTimestamp = outputTimeFn.assignOutputTime(nextValue.getTimestamp(), currentWindow); + } + + } + + // emit the final accumulator + out.collect( + WindowedValue.of( + KV.of(key, flinkCombiner.extractOutput(key, accumulator, + options, sideInputReader, currentValue.getWindows())), + windowTimestamp, + currentWindow, + PaneInfo.NO_FIRING)); + } + + /** + * Merge windows. This assumes that the list of elements is sorted by window-end timestamp. + * This replaces windows in the input list. + */ + private void mergeWindow(List<WindowedValue<KV<K, InputT>>> elements) { + int currentStart = 0; + IntervalWindow currentWindow = + (IntervalWindow) Iterables.getOnlyElement(elements.get(0).getWindows()); + + for (int i = 1; i < elements.size(); i++) { + WindowedValue<KV<K, InputT>> nextValue = elements.get(i); + IntervalWindow nextWindow = + (IntervalWindow) Iterables.getOnlyElement(nextValue.getWindows()); + if (currentWindow.intersects(nextWindow)) { + // we continue + currentWindow = currentWindow.span(nextWindow); + } else { + // retrofit the merged window to all windows up to "currentStart" + for (int j = i - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + currentStart = i; + currentWindow = nextWindow; + } + } + if (currentStart < elements.size() - 1) { + // we have to retrofit the last batch + for (int j = elements.size() - 1; j >= currentStart; j--) { + WindowedValue<KV<K, InputT>> value = elements.get(j); + elements.set( + j, + WindowedValue.of( + value.getValue(), value.getTimestamp(), currentWindow, value.getPane())); + } + } + } +}