Repository: incubator-beam Updated Branches: refs/heads/master 8bfa08519 -> ee55f6e39
[BEAM-965] Set Correct Output Type on Sources in Flink Stream Runner Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a61832a8 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a61832a8 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a61832a8 Branch: refs/heads/master Commit: a61832a83c15e0aa5e3cc1985fc32aa1afb348e6 Parents: 8bfa085 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri Nov 11 23:58:16 2016 +0100 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Mon Nov 14 22:07:45 2016 +0100 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 13 ++++++++++--- 1 file changed, 10 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a61832a8/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java index a3e8a49..687e9c8 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkStreamingTransformTranslators.java @@ -219,6 +219,9 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + DataStream<WindowedValue<T>> source; if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { @SuppressWarnings("unchecked") @@ -246,7 +249,7 @@ public class FlinkStreamingTransformTranslators { new Instant(flinkAssigner.extractTimestamp(s, -1)), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - }}); + }}).returns(outputTypeInfo); } else { try { UnboundedSourceWrapper<T, ?> sourceWrapper = @@ -256,7 +259,7 @@ public class FlinkStreamingTransformTranslators { context.getExecutionEnvironment().getParallelism()); source = context .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()); + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); } catch (Exception e) { throw new RuntimeException( "Error while translating UnboundedSource: " + transform.getSource(), e); @@ -276,6 +279,10 @@ public class FlinkStreamingTransformTranslators { FlinkStreamingTranslationContext context) { PCollection<T> output = context.getOutput(transform); + TypeInformation<WindowedValue<T>> outputTypeInfo = + context.getTypeInfo(context.getOutput(transform)); + + DataStream<WindowedValue<T>> source; try { BoundedSourceWrapper<T> sourceWrapper = @@ -285,7 +292,7 @@ public class FlinkStreamingTransformTranslators { context.getExecutionEnvironment().getParallelism()); source = context .getExecutionEnvironment() - .addSource(sourceWrapper).name(transform.getName()); + .addSource(sourceWrapper).name(transform.getName()).returns(outputTypeInfo); } catch (Exception e) { throw new RuntimeException( "Error while translating BoundedSource: " + transform.getSource(), e);