Repository: beam Updated Branches: refs/heads/master b3827955e -> 8160924e1
Ensure all Read outputs are consumed in Flink runner Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/217f24b9 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/217f24b9 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/217f24b9 Branch: refs/heads/master Commit: 217f24b95d9c80d658afc23511e6de33220a5c5d Parents: 2c69d25 Author: JingsongLi <lzljs3620...@aliyun.com> Authored: Fri Apr 21 09:56:46 2017 +0800 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri Apr 21 11:21:41 2017 +0200 ---------------------------------------------------------------------- .../beam/runners/flink/FlinkStreamingPipelineTranslator.java | 3 +++ 1 file changed, 3 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/217f24b9/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java index 0459ef7..42d75cf 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkStreamingPipelineTranslator.java @@ -25,6 +25,7 @@ import org.apache.beam.runners.core.construction.PTransformMatchers; import org.apache.beam.runners.core.construction.PTransformReplacements; import org.apache.beam.runners.core.construction.ReplacementOutputs; import org.apache.beam.runners.core.construction.SingleInputOutputOverrideFactory; +import org.apache.beam.runners.core.construction.UnconsumedReads; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.PTransformOverride; @@ -112,6 +113,8 @@ class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { flinkRunner))) .build(); + // Ensure all outputs of all reads are consumed. + UnconsumedReads.ensureAllReadsConsumed(pipeline); pipeline.replaceAll(transformOverrides); super.translate(pipeline); }