Repository: incubator-beam
Updated Branches:
  refs/heads/master c53e0b162 -> 6807480a9


[BEAM-1096] Flink streaming side output optimization using SplitStream


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/f1a5704a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/f1a5704a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/f1a5704a

Branch: refs/heads/master
Commit: f1a5704a505b01d7d4649b61d1f6697859367964
Parents: c53e0b1
Author: Alexey Diomin <diomi...@gmail.com>
Authored: Wed Dec 7 09:48:35 2016 +0400
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Thu Dec 8 09:55:22 2016 +0800

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 28 +++++++++++++-------
 1 file changed, 18 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/f1a5704a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index 47935eb..7b32c76 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -78,11 +78,13 @@ import 
org.apache.flink.api.common.functions.RichFlatMapFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.streaming.api.collector.selector.OutputSelector;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.KeyedStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.datastream.SplitStream;
 import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
@@ -554,6 +556,14 @@ public class FlinkStreamingTransformTranslators {
             .transform(transform.getName(), outputUnionTypeInformation, 
doFnOperator);
       }
 
+      SplitStream<RawUnionValue> splitStream = unionOutputStream
+              .split(new OutputSelector<RawUnionValue>() {
+                @Override
+                public Iterable<String> select(RawUnionValue value) {
+                  return 
Collections.singletonList(Integer.toString(value.getUnionTag()));
+                }
+              });
+
       for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) 
{
         final int outputTag = tagsToLabels.get(output.getKey());
 
@@ -561,17 +571,15 @@ public class FlinkStreamingTransformTranslators {
             context.getTypeInfo(output.getValue());
 
         @SuppressWarnings("unchecked")
-        DataStream filtered =
-            unionOutputStream.flatMap(new FlatMapFunction<RawUnionValue, 
Object>() {
-              @Override
-              public void flatMap(RawUnionValue value, Collector<Object> out) 
throws Exception {
-                if (value.getUnionTag() == outputTag) {
-                  out.collect(value.getValue());
-                }
-              }
-            }).returns(outputTypeInfo);
+        DataStream unwrapped = splitStream.select(String.valueOf(outputTag))
+          .flatMap(new FlatMapFunction<RawUnionValue, Object>() {
+            @Override
+            public void flatMap(RawUnionValue value, Collector<Object> out) 
throws Exception {
+              out.collect(value.getValue());
+            }
+          }).returns(outputTypeInfo);
 
-        context.setOutputDataStream(output.getValue(), filtered);
+        context.setOutputDataStream(output.getValue(), unwrapped);
       }
     }
 

Reply via email to