Repository: incubator-beam
Updated Branches:
  refs/heads/master 8bfa08519 -> ee55f6e39


[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a61832a8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a61832a8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a61832a8

Branch: refs/heads/master
Commit: a61832a83c15e0aa5e3cc1985fc32aa1afb348e6
Parents: 8bfa085
Author: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Authored: Fri Nov 11 23:58:16 2016 +0100
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Mon Nov 14 22:07:45 2016 +0100

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java            | 13 ++++++++++---
 1 file changed, 10 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a61832a8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
index a3e8a49..687e9c8 100644
--- 
a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java
@@ -219,6 +219,9 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
       DataStream<WindowedValue<T>> source;
       if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
         @SuppressWarnings("unchecked")
@@ -246,7 +249,7 @@ public class FlinkStreamingTransformTranslators {
                         new Instant(flinkAssigner.extractTimestamp(s, -1)),
                         GlobalWindow.INSTANCE,
                         PaneInfo.NO_FIRING));
-              }});
+              }}).returns(outputTypeInfo);
       } else {
         try {
           UnboundedSourceWrapper<T, ?> sourceWrapper =
@@ -256,7 +259,7 @@ public class FlinkStreamingTransformTranslators {
                   context.getExecutionEnvironment().getParallelism());
           source = context
               .getExecutionEnvironment()
-              .addSource(sourceWrapper).name(transform.getName());
+              
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
         } catch (Exception e) {
           throw new RuntimeException(
               "Error while translating UnboundedSource: " + 
transform.getSource(), e);
@@ -276,6 +279,10 @@ public class FlinkStreamingTransformTranslators {
         FlinkStreamingTranslationContext context) {
       PCollection<T> output = context.getOutput(transform);
 
+      TypeInformation<WindowedValue<T>> outputTypeInfo =
+          context.getTypeInfo(context.getOutput(transform));
+
+
       DataStream<WindowedValue<T>> source;
       try {
         BoundedSourceWrapper<T> sourceWrapper =
@@ -285,7 +292,7 @@ public class FlinkStreamingTransformTranslators {
                 context.getExecutionEnvironment().getParallelism());
         source = context
             .getExecutionEnvironment()
-            .addSource(sourceWrapper).name(transform.getName());
+            
.addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo);
       } catch (Exception e) {
         throw new RuntimeException(
             "Error while translating BoundedSource: " + transform.getSource(), 
e);

Reply via email to