Fix faulty Flink Flatten when PCollectionList is empty
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/4e60a497 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/4e60a497 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/4e60a497 Branch: refs/heads/master Commit: 4e60a497b313414aa2b2968b8def6c6f753908fe Parents: 26fa0b2 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Authored: Fri May 13 14:17:50 2016 +0200 Committer: Aljoscha Krettek <aljoscha.kret...@gmail.com> Committed: Fri May 20 08:08:24 2016 +0200 ---------------------------------------------------------------------- .../FlinkBatchTransformTranslators.java | 32 +++++++++++++++----- 1 file changed, 25 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/4e60a497/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java index a03352e..07785aa 100644 --- a/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java +++ b/runners/flink/runner/src/main/java/org/apache/beam/runners/flink/translation/FlinkBatchTransformTranslators.java @@ -34,6 +34,7 @@ import org.apache.beam.runners.flink.translation.wrappers.SourceInputFormat; import org.apache.beam.sdk.coders.CannotProvideCoderException; import org.apache.beam.sdk.coders.Coder; import org.apache.beam.sdk.coders.KvCoder; +import org.apache.beam.sdk.coders.VoidCoder; import org.apache.beam.sdk.io.AvroIO; import org.apache.beam.sdk.io.BoundedSource; import org.apache.beam.sdk.io.Read; @@ -61,6 +62,7 @@ import org.apache.beam.sdk.values.TupleTag; import com.google.api.client.util.Maps; import com.google.common.collect.Lists; +import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.common.functions.GroupReduceFunction; import org.apache.flink.api.common.operators.Keys; import org.apache.flink.api.common.typeinfo.TypeInformation; @@ -78,6 +80,7 @@ import org.apache.flink.api.java.operators.Grouping; import org.apache.flink.api.java.operators.MapPartitionOperator; import org.apache.flink.api.java.operators.UnsortedGrouping; import org.apache.flink.core.fs.Path; +import org.apache.flink.util.Collector; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,7 +94,7 @@ import java.util.Map; /** * Translators for transforming * Dataflow {@link org.apache.beam.sdk.transforms.PTransform}s to - * Flink {@link org.apache.flink.api.java.DataSet}s + * Flink {@link org.apache.flink.api.java.DataSet}s. */ public class FlinkBatchTransformTranslators { @@ -465,15 +468,30 @@ public class FlinkBatchTransformTranslators { private static class FlattenPCollectionTranslatorBatch<T> implements FlinkBatchPipelineTranslator.BatchTransformTranslator<Flatten.FlattenPCollectionList<T>> { @Override + @SuppressWarnings("unchecked") public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkBatchTranslationContext context) { List<PCollection<T>> allInputs = context.getInput(transform).getAll(); DataSet<T> result = null; - for(PCollection<T> collection : allInputs) { - DataSet<T> current = context.getInputDataSet(collection); - if (result == null) { - result = current; - } else { - result = result.union(current); + if (allInputs.isEmpty()) { + // create an empty dummy source to satisfy downstream operations + // we cannot create an empty source in Flink, therefore we have to + // add the flatMap that simply never forwards the single element + DataSource<String> dummySource = + context.getExecutionEnvironment().fromElements("dummy"); + result = dummySource.flatMap(new FlatMapFunction<String, T>() { + @Override + public void flatMap(String s, Collector<T> collector) throws Exception { + // never return anything + } + }).returns(new CoderTypeInformation<>((Coder<T>) VoidCoder.of())); + } else { + for (PCollection<T> collection : allInputs) { + DataSet<T> current = context.getInputDataSet(collection); + if (result == null) { + result = current; + } else { + result = result.union(current); + } } } context.setOutputDataSet(context.getOutput(transform), result);