Repository: incubator-beam Updated Branches: refs/heads/master 3e4b2fd0d -> 8d1214a3b
Replace WindowAssignment OldDoFn by FlatMap in Flink Runner The streaming runner had an OldDoFn that was used for assigning windows using a WindowFn. This is now done with a FlatMap. Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4a097729 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4a097729 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4a097729 Branch: refs/heads/master Commit: 4a097729ac9fc65283f4f11f85812188589c8df3 Parents: 3e4b2fd Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Tue Nov 8 11:03:21 2016 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Nov 24 11:39:30 2016 +0100 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 63 +++----------------- 1 file changed, 9 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4a097729/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 40dfbb9..47935eb 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -18,7 +18,6 @@ package org.apache.beam.runners.flink.translation; -import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; import java.nio.ByteBuffer; @@ -31,6 +30,7 @@ import java.util.Map; import java.util.Set; import org.apache.beam.runners.core.SystemReduceFn; import org.apache.beam.runners.flink.FlinkRunner; +import org.apache.beam.runners.flink.translation.functions.FlinkAssignWindows; import org.apache.beam.runners.flink.translation.types.CoderTypeInformation; import org.apache.beam.runners.flink.translation.types.FlinkCoder; import org.apache.beam.runners.flink.translation.wrappers.streaming.DoFnOperator; @@ -53,7 +53,6 @@ import org.apache.beam.sdk.transforms.Combine; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.Flatten; import org.apache.beam.sdk.transforms.GroupByKey; -import org.apache.beam.sdk.transforms.OldDoFn; import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.ParDo; import org.apache.beam.sdk.transforms.join.RawUnionValue; @@ -637,64 +636,20 @@ public class FlinkStreamingTransformTranslators { TypeInformation<WindowedValue<T>> typeInfo = context.getTypeInfo(context.getOutput(transform)); - OldDoFn<T, T> windowAssignerDoFn = - createWindowAssigner(windowingStrategy.getWindowFn()); - - @SuppressWarnings("unchecked") - PCollection<T> inputPCollection = context.getInput(transform); - - TypeInformation<WindowedValue<T>> inputTypeInfo = - context.getTypeInfo(inputPCollection); - - DoFnOperator<T, T, WindowedValue<T>> doFnOperator = new DoFnOperator<>( - windowAssignerDoFn, - inputTypeInfo, - new TupleTag<T>("main output"), - Collections.<TupleTag<?>>emptyList(), - new DoFnOperator.DefaultOutputManagerFactory<WindowedValue<T>>(), - windowingStrategy, - new HashMap<Integer, PCollectionView<?>>(), /* side-input mapping */ - Collections.<PCollectionView<?>>emptyList(), /* side inputs */ - context.getPipelineOptions()); - DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<T>> outDataStream = inputDataStream - .transform(transform.getName(), typeInfo, doFnOperator); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } + WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); - private static <T, W extends BoundedWindow> OldDoFn<T, T> createWindowAssigner( - final WindowFn<T, W> windowFn) { + FlinkAssignWindows<T, ? extends BoundedWindow> assignWindowsFunction = + new FlinkAssignWindows<>(windowFn); - return new OldDoFn<T, T>() { + SingleOutputStreamOperator<WindowedValue<T>> outputDataStream = inputDataStream + .flatMap(assignWindowsFunction) + .name(context.getOutput(transform).getName()) + .returns(typeInfo); - @Override - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = windowFn.assignWindows( - windowFn.new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public BoundedWindow window() { - return Iterables.getOnlyElement(c.windowingInternals().windows()); - } - }); - - c.windowingInternals().outputWindowedValue( - c.element(), c.timestamp(), windows, c.pane()); - } - }; + context.setOutputDataStream(context.getOutput(transform), outputDataStream); } }