Repository: incubator-beam Updated Branches: refs/heads/master c53e0b162 -> 6807480a9
[BEAM-1096] Flink streaming side output optimization using SplitStream Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1a5704a Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1a5704a Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1a5704a Branch: refs/heads/master Commit: f1a5704a505b01d7d4649b61d1f6697859367964 Parents: c53e0b1 Author: Alexey Diomin <diomi...@gmail.com> Authored: Wed Dec 7 09:48:35 2016 +0400 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Thu Dec 8 09:55:22 2016 +0800 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 28 +++++++++++++------- 1 file changed, 18 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1a5704a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index 47935eb..7b32c76 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -78,11 +78,13 @@ import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.streaming.api.collector.selector.OutputSelector; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSink; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.KeyedStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; +import org.apache.flink.streaming.api.datastream.SplitStream; import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks; import org.apache.flink.streaming.api.operators.OneInputStreamOperator; import org.apache.flink.streaming.api.operators.TwoInputStreamOperator; @@ -554,6 +556,14 @@ public class FlinkStreamingTransformTranslators { .transform(transform.getName(), outputUnionTypeInformation, doFnOperator); } + SplitStream<RawUnionValue> splitStream = unionOutputStream + .split(new OutputSelector<RawUnionValue>() { + @Override + public Iterable<String> select(RawUnionValue value) { + return Collections.singletonList(Integer.toString(value.getUnionTag())); + } + }); + for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) { final int outputTag = tagsToLabels.get(output.getKey()); @@ -561,17 +571,15 @@ public class FlinkStreamingTransformTranslators { context.getTypeInfo(output.getValue()); @SuppressWarnings("unchecked") - DataStream filtered = - unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, Object>() { - @Override - public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { - if (value.getUnionTag() == outputTag) { - out.collect(value.getValue()); - } - } - }).returns(outputTypeInfo); + DataStream unwrapped = splitStream.select(String.valueOf(outputTag)) + .flatMap(new FlatMapFunction<RawUnionValue, Object>() { + @Override + public void flatMap(RawUnionValue value, Collector<Object> out) throws Exception { + out.collect(value.getValue()); + } + }).returns(outputTypeInfo); - context.setOutputDataStream(output.getValue(), filtered); + context.setOutputDataStream(output.getValue(), unwrapped); } }