[runner] add Create transform
Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3227fccd Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3227fccd Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3227fccd Branch: refs/heads/master Commit: 3227fccdb0c4e09c6367b25d436c7e89cd881a2d Parents: 0be42cb Author: Max <m...@posteo.de> Authored: Mon Feb 22 18:25:23 2016 +0100 Committer: Davor Bonaci <davorbon...@users.noreply.github.com> Committed: Fri Mar 4 10:04:23 2016 -0800 ---------------------------------------------------------------------- .../FlinkStreamingTransformTranslators.java | 60 +++++++++++++++++-- .../io/FlinkStreamingCreateFunction.java | 61 ++++++++++++++++++++ 2 files changed, 117 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3227fccd/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java index 17583cd..46d3e36 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java @@ -13,11 +13,13 @@ * See the License for the specific language governing permissions and * limitations under the License. */ + package com.dataartisans.flink.dataflow.translation; import com.dataartisans.flink.dataflow.translation.functions.UnionCoder; import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation; import com.dataartisans.flink.dataflow.translation.wrappers.streaming.*; +import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.FlinkStreamingCreateFunction; import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource; import com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSourceWrapper; import com.google.api.client.util.Maps; @@ -37,6 +39,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag; import com.google.common.collect.Lists; import org.apache.flink.api.common.functions.FilterFunction; import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.streaming.api.datastream.*; import org.apache.flink.util.Collector; @@ -44,6 +47,8 @@ import org.joda.time.Instant; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.ByteArrayOutputStream; +import java.io.IOException; import java.util.*; /** @@ -64,6 +69,8 @@ public class FlinkStreamingTransformTranslators { // here you can find all the available translators. static { + + TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator()); TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); @@ -83,6 +90,47 @@ public class FlinkStreamingTransformTranslators { // Transformation Implementations // -------------------------------------------------------------------------------------------- + private static class CreateStreamingTranslator<OUT> implements + FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> { + + @Override + public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) { + PCollection<OUT> output = context.getOutput(transform); + Iterable<OUT> elements = transform.getElements(); + + // we need to serialize the elements to byte arrays, since they might contain + // elements that are not serializable by Java serialization. We deserialize them + // in the FlatMap function using the Coder. + + List<byte[]> serializedElements = Lists.newArrayList(); + Coder<OUT> elementCoder = context.getOutput(transform).getCoder(); + for (OUT element: elements) { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + try { + elementCoder.encode(element, bao, Coder.Context.OUTER); + serializedElements.add(bao.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize Create elements using Coder: " + e); + } + } + + + DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); + + FlinkStreamingCreateFunction<Integer, OUT> createFunction = + new FlinkStreamingCreateFunction<>(serializedElements, elementCoder); + + WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder); + TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder); + + DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction) + .returns(outputType); + + context.setOutputDataStream(context.getOutput(transform), outputDataStream); + } + } + + private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> { private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); @@ -151,12 +199,16 @@ public class FlinkStreamingTransformTranslators { (WindowingStrategy<OUT, ? extends BoundedWindow>) context.getOutput(transform).getWindowingStrategy(); - WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(), windowingStrategy.getWindowFn().windowCoder()); - CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder = new CoderTypeInformation<>(outputStreamCoder); + WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(), + windowingStrategy.getWindowFn().windowCoder()); + CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder = + new CoderTypeInformation<>(outputStreamCoder); - FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>(context.getPipelineOptions(), windowingStrategy, transform.getFn()); + FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>( + context.getPipelineOptions(), windowingStrategy, transform.getFn()); DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<OUT>, ?> outDataStream = inputDataStream.flatMap(doFnWrapper).returns(outputWindowedValueCoder); + SingleOutputStreamOperator<WindowedValue<OUT>, ?> outDataStream = inputDataStream.flatMap(doFnWrapper) + .returns(outputWindowedValueCoder); context.setOutputDataStream(context.getOutput(transform), outDataStream); } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3227fccd/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java new file mode 100644 index 0000000..b8824f5 --- /dev/null +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java @@ -0,0 +1,61 @@ +/* + * Copyright 2015 Data Artisans GmbH + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io; + +import com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer; +import com.google.cloud.dataflow.sdk.coders.Coder; +import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow; +import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo; +import com.google.cloud.dataflow.sdk.util.WindowedValue; +import org.apache.flink.api.common.functions.FlatMapFunction; +import org.apache.flink.util.Collector; + +import java.io.ByteArrayInputStream; +import java.util.List; + +/** + * This flat map function bootstraps from collection elements and turns them into WindowedValues + * (as required by the Flink runner). + */ +public class FlinkStreamingCreateFunction<IN, OUT> implements FlatMapFunction<IN, WindowedValue<OUT>> { + + private final List<byte[]> elements; + private final Coder<OUT> coder; + + public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> coder) { + this.elements = elements; + this.coder = coder; + } + + @Override + public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws Exception { + + @SuppressWarnings("unchecked") + // TODO Flink doesn't allow null values in records + OUT voidValue = (OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE; + + for (byte[] element : elements) { + ByteArrayInputStream bai = new ByteArrayInputStream(element); + OUT outValue = coder.decode(bai, Coder.Context.OUTER); + + if (outValue == null) { + out.collect(WindowedValue.of(voidValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } else { + out.collect(WindowedValue.of(outValue, GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } + } + } +}