Fix Dangling Flink DataSets
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/26fa0b21 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/26fa0b21 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/26fa0b21 Branch: refs/heads/master Commit: 26fa0b21cfda3049e26d47ce174a9b29fe3ec29c Parents: 1664c96 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri May 6 08:26:50 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri May 20 08:08:24 2016 +0200 ---------------------------------------------------------------------- .../translation/FlinkBatchPipelineTranslator.java | 14 ++++++++++++++ .../translation/FlinkBatchTranslationContext.java | 18 +++++++++++++++++- 2 files changed, 31 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java index 3d39e81..512b822 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchPipelineTranslator.java @@ -17,6 +17,7 @@ */ package org.apache.beam.runners.flink.translation; +import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.runners.TransformTreeNode; import org.apache.beam.sdk.transforms.AppliedPTransform; @@ -24,7 +25,9 @@ import org.apache.beam.sdk.transforms.PTransform; import org.apache.beam.sdk.transforms.join.CoGroupByKey; import org.apache.beam.sdk.values.PValue; +import org.apache.flink.api.java.DataSet; import org.apache.flink.api.java.ExecutionEnvironment; +import org.apache.flink.api.java.io.DiscardingOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -47,6 +50,17 @@ public class FlinkBatchPipelineTranslator extends FlinkPipelineTranslator { this.batchContext = new FlinkBatchTranslationContext(env, options); } + @Override + @SuppressWarnings("rawtypes, unchecked") + public void translate(Pipeline pipeline) { + super.translate(pipeline); + + // terminate dangling DataSets + for (DataSet<?> dataSet: batchContext.getDanglingDataSets().values()) { + dataSet.output(new DiscardingOutputFormat()); + } + } + // -------------------------------------------------------------------------------------------- // Pipeline Visitor Methods // -------------------------------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/26fa0b21/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java index 71950cf..501b1ea 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTranslationContext.java @@ -43,6 +43,13 @@ public class FlinkBatchTranslationContext { private final Map<PValue, DataSet<?>> dataSets; private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets; + /** + * For keeping track about which DataSets don't have a successor. We + * need to terminate these with a discarding sink because the Beam + * model allows dangling operations. + */ + private final Map<PValue, DataSet<?>> danglingDataSets; + private final ExecutionEnvironment env; private final PipelineOptions options; @@ -55,10 +62,16 @@ public class FlinkBatchTranslationContext { this.options = options; this.dataSets = new HashMap<>(); this.broadcastDataSets = new HashMap<>(); + + this.danglingDataSets = new HashMap<>(); } // ------------------------------------------------------------------------ - + + public Map<PValue, DataSet<?>> getDanglingDataSets() { + return danglingDataSets; + } + public ExecutionEnvironment getExecutionEnvironment() { return env; } @@ -69,12 +82,15 @@ public class FlinkBatchTranslationContext { @SuppressWarnings("unchecked") public <T> DataSet<T> getInputDataSet(PValue value) { + // assume that the DataSet is used as an input if retrieved here + danglingDataSets.remove(value); return (DataSet<T>) dataSets.get(value); } public void setOutputDataSet(PValue value, DataSet<?> set) { if (!dataSets.containsKey(value)) { dataSets.put(value, set); + danglingDataSets.put(value, set); } }