Repository: beam
Updated Branches:
  refs/heads/master b3827955e -> 8160924e1


Ensure all Read outputs are consumed in Flink runner


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/217f24b9
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/217f24b9
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/217f24b9

Branch: refs/heads/master
Commit: 217f24b95d9c80d658afc23511e6de33220a5c5d
Parents: 2c69d25
Author: JingsongLi <lzljs3620...@aliyun.com>
Authored: Fri Apr 21 09:56:46 2017 +0800
Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com>
Committed: Fri Apr 21 11:21:41 2017 +0200

----------------------------------------------------------------------
 .../beam/runners/flink/FlinkStreamingPipelineTranslator.java      | 3 +++
 1 file changed, 3 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/217f24b9/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
index 0459ef7..42d75cf 100644
--- 
a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java
@@ -25,6 +25,7 @@ import 
org.apache.beam.runners.core.construction.PTransformMatchers;
 import org.apache.beam.runners.core.construction.PTransformReplacements;
 import org.apache.beam.runners.core.construction.ReplacementOutputs;
 import 
org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory;
+import org.apache.beam.runners.core.construction.UnconsumedReads;
 import org.apache.beam.sdk.Pipeline;
 import org.apache.beam.sdk.options.PipelineOptions;
 import org.apache.beam.sdk.runners.PTransformOverride;
@@ -112,6 +113,8 @@ class FlinkStreamingPipelineTranslator extends 
FlinkPipelineTranslator {
                         flinkRunner)))
             .build();
 
+    // Ensure all outputs of all reads are consumed.
+    UnconsumedReads.ensureAllReadsConsumed(pipeline);
     pipeline.replaceAll(transformOverrides);
     super.translate(pipeline);
   }

Reply via email to