[runner] add Create transform

Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/3227fccd
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/3227fccd
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/3227fccd

Branch: refs/heads/master
Commit: 3227fccdb0c4e09c6367b25d436c7e89cd881a2d
Parents: 0be42cb
Author: Max <m...@posteo.de>
Authored: Mon Feb 22 18:25:23 2016 +0100
Committer: Davor Bonaci <davorbon...@users.noreply.github.com>
Committed: Fri Mar 4 10:04:23 2016 -0800

----------------------------------------------------------------------
 .../FlinkStreamingTransformTranslators.java     | 60 +++++++++++++++++--
 .../io/FlinkStreamingCreateFunction.java        | 61 ++++++++++++++++++++
 2 files changed, 117 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3227fccd/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
index 17583cd..46d3e36 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -13,11 +13,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+
 package com.dataartisans.flink.dataflow.translation;
 
 import com.dataartisans.flink.dataflow.translation.functions.UnionCoder;
 import com.dataartisans.flink.dataflow.translation.types.CoderTypeInformation;
 import com.dataartisans.flink.dataflow.translation.wrappers.streaming.*;
+import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.FlinkStreamingCreateFunction;
 import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedFlinkSource;
 import 
com.dataartisans.flink.dataflow.translation.wrappers.streaming.io.UnboundedSourceWrapper;
 import com.google.api.client.util.Maps;
@@ -37,6 +39,7 @@ import com.google.cloud.dataflow.sdk.values.TupleTag;
 import com.google.common.collect.Lists;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.streaming.api.datastream.*;
 import org.apache.flink.util.Collector;
@@ -44,6 +47,8 @@ import org.joda.time.Instant;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.util.*;
 
 /**
@@ -64,6 +69,8 @@ public class FlinkStreamingTransformTranslators {
 
        // here you can find all the available translators.
        static {
+
+               TRANSLATORS.put(Create.Values.class, new 
CreateStreamingTranslator());
                TRANSLATORS.put(Read.Unbounded.class, new 
UnboundedReadSourceTranslator());
                TRANSLATORS.put(ParDo.Bound.class, new 
ParDoBoundStreamingTranslator());
                TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
@@ -83,6 +90,47 @@ public class FlinkStreamingTransformTranslators {
        //  Transformation Implementations
        // 
--------------------------------------------------------------------------------------------
 
+       private static class CreateStreamingTranslator<OUT> implements
+                       
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
+
+               @Override
+               public void translateNode(Create.Values<OUT> transform, 
FlinkStreamingTranslationContext context) {
+                       PCollection<OUT> output = context.getOutput(transform);
+                       Iterable<OUT> elements = transform.getElements();
+
+                       // we need to serialize the elements to byte arrays, 
since they might contain
+                       // elements that are not serializable by Java 
serialization. We deserialize them
+                       // in the FlatMap function using the Coder.
+
+                       List<byte[]> serializedElements = Lists.newArrayList();
+                       Coder<OUT> elementCoder = 
context.getOutput(transform).getCoder();
+                       for (OUT element: elements) {
+                               ByteArrayOutputStream bao = new 
ByteArrayOutputStream();
+                               try {
+                                       elementCoder.encode(element, bao, 
Coder.Context.OUTER);
+                                       
serializedElements.add(bao.toByteArray());
+                               } catch (IOException e) {
+                                       throw new RuntimeException("Could not 
serialize Create elements using Coder: " + e);
+                               }
+                       }
+
+
+                       DataStream<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
+
+                       FlinkStreamingCreateFunction<Integer, OUT> 
createFunction =
+                                       new 
FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
+
+                       WindowedValue.ValueOnlyWindowedValueCoder<OUT> 
windowCoder = WindowedValue.getValueOnlyCoder(elementCoder);
+                       TypeInformation<WindowedValue<OUT>> outputType = new 
CoderTypeInformation<>(windowCoder);
+
+                       DataStream<WindowedValue<OUT>> outputDataStream = 
initDataSet.flatMap(createFunction)
+                                       .returns(outputType);
+
+                       
context.setOutputDataStream(context.getOutput(transform), outputDataStream);
+               }
+       }
+
+
        private static class TextIOWriteBoundStreamingTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>>
 {
                private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
 
@@ -151,12 +199,16 @@ public class FlinkStreamingTransformTranslators {
                                        (WindowingStrategy<OUT, ? extends 
BoundedWindow>)
                                                        
context.getOutput(transform).getWindowingStrategy();
 
-                       WindowedValue.WindowedValueCoder<OUT> outputStreamCoder 
= WindowedValue.getFullCoder(output.getCoder(), 
windowingStrategy.getWindowFn().windowCoder());
-                       CoderTypeInformation<WindowedValue<OUT>> 
outputWindowedValueCoder = new CoderTypeInformation<>(outputStreamCoder);
+                       WindowedValue.WindowedValueCoder<OUT> outputStreamCoder 
= WindowedValue.getFullCoder(output.getCoder(),
+                                       
windowingStrategy.getWindowFn().windowCoder());
+                       CoderTypeInformation<WindowedValue<OUT>> 
outputWindowedValueCoder =
+                                       new 
CoderTypeInformation<>(outputStreamCoder);
 
-                       FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundWrapper<>(context.getPipelineOptions(), windowingStrategy, 
transform.getFn());
+                       FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundWrapper<>(
+                                       context.getPipelineOptions(), 
windowingStrategy, transform.getFn());
                        DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
-                       SingleOutputStreamOperator<WindowedValue<OUT>, ?> 
outDataStream = 
inputDataStream.flatMap(doFnWrapper).returns(outputWindowedValueCoder);
+                       SingleOutputStreamOperator<WindowedValue<OUT>, ?> 
outDataStream = inputDataStream.flatMap(doFnWrapper)
+                                       .returns(outputWindowedValueCoder);
 
                        
context.setOutputDataStream(context.getOutput(transform), outDataStream);
                }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/3227fccd/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
new file mode 100644
index 0000000..b8824f5
--- /dev/null
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/io/FlinkStreamingCreateFunction.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2015 Data Artisans GmbH
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.dataartisans.flink.dataflow.translation.wrappers.streaming.io;
+
+import 
com.dataartisans.flink.dataflow.translation.types.VoidCoderTypeSerializer;
+import com.google.cloud.dataflow.sdk.coders.Coder;
+import com.google.cloud.dataflow.sdk.transforms.windowing.GlobalWindow;
+import com.google.cloud.dataflow.sdk.transforms.windowing.PaneInfo;
+import com.google.cloud.dataflow.sdk.util.WindowedValue;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.util.Collector;
+
+import java.io.ByteArrayInputStream;
+import java.util.List;
+
+/**
+ * This flat map function bootstraps from collection elements and turns them 
into WindowedValues
+ * (as required by the Flink runner).
+ */
+public class FlinkStreamingCreateFunction<IN, OUT> implements 
FlatMapFunction<IN, WindowedValue<OUT>> {
+
+       private final List<byte[]> elements;
+       private final Coder<OUT> coder;
+
+       public FlinkStreamingCreateFunction(List<byte[]> elements, Coder<OUT> 
coder) {
+               this.elements = elements;
+               this.coder = coder;
+       }
+
+       @Override
+       public void flatMap(IN value, Collector<WindowedValue<OUT>> out) throws 
Exception {
+
+               @SuppressWarnings("unchecked")
+               // TODO Flink doesn't allow null values in records
+               OUT voidValue = (OUT) 
VoidCoderTypeSerializer.VoidValue.INSTANCE;
+
+               for (byte[] element : elements) {
+                       ByteArrayInputStream bai = new 
ByteArrayInputStream(element);
+                       OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+
+                       if (outValue == null) {
+                               out.collect(WindowedValue.of(voidValue, 
GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+                       } else {
+                               out.collect(WindowedValue.of(outValue, 
GlobalWindow.TIMESTAMP_MIN_VALUE, GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+                       }
+               }
+       }
+}

Reply via email to