http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java index 1072fa3..ec1d6c8 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java @@ -38,92 +38,92 @@ import java.util.HashMap; import java.util.Map; public class FlinkBatchTranslationContext { - - private final Map<PValue, DataSet<?>> dataSets; - private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets; + + private final Map<PValue, DataSet<?>> dataSets; + private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets; - private final ExecutionEnvironment env; - private final PipelineOptions options; + private final ExecutionEnvironment env; + private final PipelineOptions options; - private AppliedPTransform<?, ?, ?> currentTransform; - - // ------------------------------------------------------------------------ - - public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) { - this.env = env; - this.options = options; - this.dataSets = new HashMap<>(); - this.broadcastDataSets = new HashMap<>(); - } - - // ------------------------------------------------------------------------ - - public ExecutionEnvironment getExecutionEnvironment() { - return env; - } + private AppliedPTransform<?, ?, ?> currentTransform; + + // ------------------------------------------------------------------------ + + public FlinkBatchTranslationContext(ExecutionEnvironment env, PipelineOptions options) { + this.env = env; + this.options = options; + this.dataSets = new HashMap<>(); + this.broadcastDataSets = new HashMap<>(); + } + + // ------------------------------------------------------------------------ + + public ExecutionEnvironment getExecutionEnvironment() { + return env; + } - public PipelineOptions getPipelineOptions() { - return options; - } - - @SuppressWarnings("unchecked") - public <T> DataSet<T> getInputDataSet(PValue value) { - return (DataSet<T>) dataSets.get(value); - } + public PipelineOptions getPipelineOptions() { + return options; + } + + @SuppressWarnings("unchecked") + public <T> DataSet<T> getInputDataSet(PValue value) { + return (DataSet<T>) dataSets.get(value); + } - public void setOutputDataSet(PValue value, DataSet<?> set) { - if (!dataSets.containsKey(value)) { - dataSets.put(value, set); - } - } + public void setOutputDataSet(PValue value, DataSet<?> set) { + if (!dataSets.containsKey(value)) { + dataSets.put(value, set); + } + } - /** - * Sets the AppliedPTransform which carries input/output. - * @param currentTransform - */ - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } - @SuppressWarnings("unchecked") - public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) { - return (DataSet<T>) broadcastDataSets.get(value); - } + @SuppressWarnings("unchecked") + public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) { + return (DataSet<T>) broadcastDataSets.get(value); + } - public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) { - if (!broadcastDataSets.containsKey(value)) { - broadcastDataSets.put(value, set); - } - } - - @SuppressWarnings("unchecked") - public <T> TypeInformation<T> getTypeInfo(PInput output) { - if (output instanceof TypedPValue) { - Coder<?> outputCoder = ((TypedPValue) output).getCoder(); - if (outputCoder instanceof KvCoder) { - return new KvCoderTypeInformation((KvCoder) outputCoder); - } else { - return new CoderTypeInformation(outputCoder); - } - } - return new GenericTypeInfo<>((Class<T>)Object.class); - } + public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) { + if (!broadcastDataSets.containsKey(value)) { + broadcastDataSets.put(value, set); + } + } + + @SuppressWarnings("unchecked") + public <T> TypeInformation<T> getTypeInfo(PInput output) { + if (output instanceof TypedPValue) { + Coder<?> outputCoder = ((TypedPValue) output).getCoder(); + if (outputCoder instanceof KvCoder) { + return new KvCoderTypeInformation((KvCoder) outputCoder); + } else { + return new CoderTypeInformation(outputCoder); + } + } + return new GenericTypeInfo<>((Class<T>)Object.class); + } - public <T> TypeInformation<T> getInputTypeInfo() { - return getTypeInfo(currentTransform.getInput()); - } + public <T> TypeInformation<T> getInputTypeInfo() { + return getTypeInfo(currentTransform.getInput()); + } - public <T> TypeInformation<T> getOutputTypeInfo() { - return getTypeInfo((PValue) currentTransform.getOutput()); - } + public <T> TypeInformation<T> getOutputTypeInfo() { + return getTypeInfo((PValue) currentTransform.getOutput()); + } - @SuppressWarnings("unchecked") - <I extends PInput> I getInput(PTransform<I, ?> transform) { - return (I) currentTransform.getInput(); - } + @SuppressWarnings("unchecked") + <I extends PInput> I getInput(PTransform<I, ?> transform) { + return (I) currentTransform.getInput(); + } - @SuppressWarnings("unchecked") - <O extends POutput> O getOutput(PTransform<?, O> transform) { - return (O) currentTransform.getOutput(); - } + @SuppressWarnings("unchecked") + <O extends POutput> O getOutput(PTransform<?, O> transform) { + return (O) currentTransform.getOutput(); + } }
http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java index b56fe07..a6a333b 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java @@ -28,7 +28,7 @@ import com.google.cloud.dataflow.sdk.Pipeline; */ public abstract class FlinkPipelineTranslator implements Pipeline.PipelineVisitor { - public void translate(Pipeline pipeline) { - pipeline.traverseTopologically(this); - } + public void translate(Pipeline pipeline) { + pipeline.traverseTopologically(this); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java index ea9ed14..897303d 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java @@ -31,113 +31,113 @@ import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; * */ public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator { - /** The necessary context in the case of a straming job. */ - private final FlinkStreamingTranslationContext streamingContext; - - private int depth = 0; - - /** Composite transform that we want to translate before proceeding with other transforms. */ - private PTransform<?, ?> currentCompositeTransform; - - public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { - this.streamingContext = new FlinkStreamingTranslationContext(env, options); - } - - // -------------------------------------------------------------------------------------------- - // Pipeline Visitor Methods - // -------------------------------------------------------------------------------------------- - - @Override - public void enterCompositeTransform(TransformTreeNode node) { - System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); - - PTransform<?, ?> transform = node.getTransform(); - if (transform != null && currentCompositeTransform == null) { - - StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); - if (translator != null) { - currentCompositeTransform = transform; - } - } - this.depth++; - } - - @Override - public void leaveCompositeTransform(TransformTreeNode node) { - PTransform<?, ?> transform = node.getTransform(); - if (transform != null && currentCompositeTransform == transform) { - - StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); - if (translator != null) { - System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); - applyStreamingTransform(transform, node, translator); - currentCompositeTransform = null; - } else { - throw new IllegalStateException("Attempted to translate composite transform " + - "but no translator was found: " + currentCompositeTransform); - } - } - this.depth--; - System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); - } - - @Override - public void visitTransform(TransformTreeNode node) { - System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); - if (currentCompositeTransform != null) { - // ignore it - return; - } - - // get the transformation corresponding to hte node we are - // currently visiting and translate it into its Flink alternative. - - PTransform<?, ?> transform = node.getTransform(); - StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); - if (translator == null) { - System.out.println(node.getTransform().getClass()); - throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); - } - applyStreamingTransform(transform, node, translator); - } - - @Override - public void visitValue(PValue value, TransformTreeNode producer) { - // do nothing here - } - - private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) { - - @SuppressWarnings("unchecked") - T typedTransform = (T) transform; - - @SuppressWarnings("unchecked") - StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; - - // create the applied PTransform on the streamingContext - streamingContext.setCurrentTransform(AppliedPTransform.of( - node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); - typedTranslator.translateNode(typedTransform, streamingContext); - } - - /** - * The interface that every Flink translator of a Beam operator should implement. - * This interface is for <b>streaming</b> jobs. For examples of such translators see - * {@link FlinkStreamingTransformTranslators}. - */ - public interface StreamTransformTranslator<Type extends PTransform> { - void translateNode(Type transform, FlinkStreamingTranslationContext context); - } - - private static String genSpaces(int n) { - String s = ""; - for (int i = 0; i < n; i++) { - s += "| "; - } - return s; - } - - private static String formatNodeName(TransformTreeNode node) { - return node.toString().split("@")[1] + node.getTransform(); - } + /** The necessary context in the case of a straming job. */ + private final FlinkStreamingTranslationContext streamingContext; + + private int depth = 0; + + /** Composite transform that we want to translate before proceeding with other transforms. */ + private PTransform<?, ?> currentCompositeTransform; + + public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, PipelineOptions options) { + this.streamingContext = new FlinkStreamingTranslationContext(env, options); + } + + // -------------------------------------------------------------------------------------------- + // Pipeline Visitor Methods + // -------------------------------------------------------------------------------------------- + + @Override + public void enterCompositeTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + formatNodeName(node)); + + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == null) { + + StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); + if (translator != null) { + currentCompositeTransform = transform; + } + } + this.depth++; + } + + @Override + public void leaveCompositeTransform(TransformTreeNode node) { + PTransform<?, ?> transform = node.getTransform(); + if (transform != null && currentCompositeTransform == transform) { + + StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); + if (translator != null) { + System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " + formatNodeName(node)); + applyStreamingTransform(transform, node, translator); + currentCompositeTransform = null; + } else { + throw new IllegalStateException("Attempted to translate composite transform " + + "but no translator was found: " + currentCompositeTransform); + } + } + this.depth--; + System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + formatNodeName(node)); + } + + @Override + public void visitTransform(TransformTreeNode node) { + System.out.println(genSpaces(this.depth) + "visitTransform- " + formatNodeName(node)); + if (currentCompositeTransform != null) { + // ignore it + return; + } + + // get the transformation corresponding to hte node we are + // currently visiting and translate it into its Flink alternative. + + PTransform<?, ?> transform = node.getTransform(); + StreamTransformTranslator<?> translator = FlinkStreamingTransformTranslators.getTranslator(transform); + if (translator == null) { + System.out.println(node.getTransform().getClass()); + throw new UnsupportedOperationException("The transform " + transform + " is currently not supported."); + } + applyStreamingTransform(transform, node, translator); + } + + @Override + public void visitValue(PValue value, TransformTreeNode producer) { + // do nothing here + } + + private <T extends PTransform<?, ?>> void applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, StreamTransformTranslator<?> translator) { + + @SuppressWarnings("unchecked") + T typedTransform = (T) transform; + + @SuppressWarnings("unchecked") + StreamTransformTranslator<T> typedTranslator = (StreamTransformTranslator<T>) translator; + + // create the applied PTransform on the streamingContext + streamingContext.setCurrentTransform(AppliedPTransform.of( + node.getFullName(), node.getInput(), node.getOutput(), (PTransform) transform)); + typedTranslator.translateNode(typedTransform, streamingContext); + } + + /** + * The interface that every Flink translator of a Beam operator should implement. + * This interface is for <b>streaming</b> jobs. For examples of such translators see + * {@link FlinkStreamingTransformTranslators}. + */ + public interface StreamTransformTranslator<Type extends PTransform> { + void translateNode(Type transform, FlinkStreamingTranslationContext context); + } + + private static String genSpaces(int n) { + String s = ""; + for (int i = 0; i < n; i++) { + s += "| "; + } + return s; + } + + private static String formatNodeName(TransformTreeNode node) { + return node.toString().split("@")[1] + node.getTransform(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java index 99dbedb..9fd33be 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java @@ -59,346 +59,346 @@ import java.util.*; */ public class FlinkStreamingTransformTranslators { - // -------------------------------------------------------------------------------------------- - // Transform Translator Registry - // -------------------------------------------------------------------------------------------- - - @SuppressWarnings("rawtypes") - private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); - - // here you can find all the available translators. - static { - TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator()); - TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); - TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); - TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); - TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); - TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); - TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); - TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); - TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); - } - - public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { - return TRANSLATORS.get(transform.getClass()); - } - - // -------------------------------------------------------------------------------------------- - // Transformation Implementations - // -------------------------------------------------------------------------------------------- - - private static class CreateStreamingTranslator<OUT> implements - FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> { - - @Override - public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) { - PCollection<OUT> output = context.getOutput(transform); - Iterable<OUT> elements = transform.getElements(); - - // we need to serialize the elements to byte arrays, since they might contain - // elements that are not serializable by Java serialization. We deserialize them - // in the FlatMap function using the Coder. - - List<byte[]> serializedElements = Lists.newArrayList(); - Coder<OUT> elementCoder = context.getOutput(transform).getCoder(); - for (OUT element: elements) { - ByteArrayOutputStream bao = new ByteArrayOutputStream(); - try { - elementCoder.encode(element, bao, Coder.Context.OUTER); - serializedElements.add(bao.toByteArray()); - } catch (IOException e) { - throw new RuntimeException("Could not serialize Create elements using Coder: " + e); - } - } - - - DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); - - FlinkStreamingCreateFunction<Integer, OUT> createFunction = - new FlinkStreamingCreateFunction<>(serializedElements, elementCoder); - - WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder); - TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder); - - DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction) - .returns(outputType); - - context.setOutputDataStream(context.getOutput(transform), outputDataStream); - } - } - - - private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> { - private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); - - @Override - public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); - - String filenamePrefix = transform.getFilenamePrefix(); - String filenameSuffix = transform.getFilenameSuffix(); - boolean needsValidation = transform.needsValidation(); - int numShards = transform.getNumShards(); - String shardNameTemplate = transform.getShardNameTemplate(); - - // TODO: Implement these. We need Flink support for this. - LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); - LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); - LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); - - DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() { - @Override - public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception { - out.collect(value.getValue().toString()); - } - }); - DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); - - if (numShards > 0) { - output.setParallelism(numShards); - } - } - } - - private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { - - @Override - public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) { - PCollection<T> output = context.getOutput(transform); - - DataStream<WindowedValue<T>> source; - if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { - UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); - source = context.getExecutionEnvironment() - .addSource(flinkSource.getFlinkSource()) - .flatMap(new FlatMapFunction<String, WindowedValue<String>>() { - @Override - public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception { - collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); - } - }); - } else { - source = context.getExecutionEnvironment() - .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); - } - context.setOutputDataStream(output, source); - } - } - - private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> { - - @Override - public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) { - PCollection<OUT> output = context.getOutput(transform); - - final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy = - (WindowingStrategy<OUT, ? extends BoundedWindow>) - context.getOutput(transform).getWindowingStrategy(); - - WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(), - windowingStrategy.getWindowFn().windowCoder()); - CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder = - new CoderTypeInformation<>(outputStreamCoder); - - FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>( - context.getPipelineOptions(), windowingStrategy, transform.getFn()); - DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper) - .returns(outputWindowedValueCoder); - - context.setOutputDataStream(context.getOutput(transform), outDataStream); - } - } - - public static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> { - - @Override - public void translateNode(Window.Bound<T> transform, FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); - - final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy = - (WindowingStrategy<T, ? extends BoundedWindow>) - context.getOutput(transform).getWindowingStrategy(); - - final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); - - WindowedValue.WindowedValueCoder<T> outputStreamCoder = WindowedValue.getFullCoder( - context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder()); - CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder = - new CoderTypeInformation<>(outputStreamCoder); - - final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new FlinkParDoBoundWrapper<>( - context.getPipelineOptions(), windowingStrategy, createWindowAssigner(windowFn)); - - SingleOutputStreamOperator<WindowedValue<T>> windowedStream = - inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder); - context.setOutputDataStream(context.getOutput(transform), windowedStream); - } - - private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) { - return new DoFn<T, T>() { - - @Override - public void processElement(final ProcessContext c) throws Exception { - Collection<W> windows = windowFn.assignWindows( - windowFn.new AssignContext() { - @Override - public T element() { - return c.element(); - } - - @Override - public Instant timestamp() { - return c.timestamp(); - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return c.windowingInternals().windows(); - } - }); - - c.windowingInternals().outputWindowedValue( - c.element(), c.timestamp(), windows, c.pane()); - } - }; - } - } - - public static class GroupByKeyTranslator<K, V> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> { - - @Override - public void translateNode(GroupByKey<K, V> transform, FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - - DataStream<WindowedValue<KV<K, V>>> inputDataStream = context.getInputDataStream(input); - KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) context.getInput(transform).getCoder(); - - KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = FlinkGroupByKeyWrapper - .groupStreamByKey(inputDataStream, inputKvCoder); - - DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream = - FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(), - context.getInput(transform), groupByKStream); - - context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream); - } - } - - public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, VIN, VOUT>> { - - @Override - public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, FlinkStreamingTranslationContext context) { - PValue input = context.getInput(transform); - - DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = context.getInputDataStream(input); - KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) context.getInput(transform).getCoder(); - KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) context.getOutput(transform).getCoder(); - - KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = FlinkGroupByKeyWrapper - .groupStreamByKey(inputDataStream, inputKvCoder); - - Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = (Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn(); - DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream = - FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(), - context.getInput(transform), groupByKStream, combineFn, outputKvCoder); - - context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream); - } - } - - public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> { - - @Override - public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) { - List<PCollection<T>> allInputs = context.getInput(transform).getAll(); - DataStream<T> result = null; - for (PCollection<T> collection : allInputs) { - DataStream<T> current = context.getInputDataStream(collection); - result = (result == null) ? current : result.union(current); - } - context.setOutputDataStream(context.getOutput(transform), result); - } - } - - public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, OUT>> { - - private final int MAIN_TAG_INDEX = 0; - - @Override - public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkStreamingTranslationContext context) { - - // we assume that the transformation does not change the windowing strategy. - WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = context.getInput(transform).getWindowingStrategy(); - - Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); - Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels( - transform.getMainOutputTag(), outputs.keySet()); - - UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values()); - WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = WindowedValue.getFullCoder( - intermUnionCoder, windowingStrategy.getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<RawUnionValue>> intermWindowedValueCoder = - new CoderTypeInformation<>(outputStreamCoder); - - FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundMultiWrapper<>( - context.getPipelineOptions(), windowingStrategy, transform.getFn(), - transform.getMainOutputTag(), tagsToLabels); - - DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); - SingleOutputStreamOperator<WindowedValue<RawUnionValue>> intermDataStream = - inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder); - - for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) { - final int outputTag = tagsToLabels.get(output.getKey()); - - WindowedValue.WindowedValueCoder<?> coderForTag = WindowedValue.getFullCoder( - output.getValue().getCoder(), - windowingStrategy.getWindowFn().windowCoder()); - - CoderTypeInformation<WindowedValue<?>> windowedValueCoder = - new CoderTypeInformation(coderForTag); - - context.setOutputDataStream(output.getValue(), - intermDataStream.filter(new FilterFunction<WindowedValue<RawUnionValue>>() { - @Override - public boolean filter(WindowedValue<RawUnionValue> value) throws Exception { - return value.getValue().getUnionTag() == outputTag; - } - }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() { - @Override - public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception { - collector.collect(WindowedValue.of( - value.getValue().getValue(), - value.getTimestamp(), - value.getWindows(), - value.getPane())); - } - }).returns(windowedValueCoder)); - } - } - - private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) { - Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); - tagToLabelMap.put(mainTag, MAIN_TAG_INDEX); - int count = MAIN_TAG_INDEX + 1; - for (TupleTag<?> tag : secondaryTags) { - if (!tagToLabelMap.containsKey(tag)) { - tagToLabelMap.put(tag, count++); - } - } - return tagToLabelMap; - } - - private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) { - List<Coder<?>> outputCoders = Lists.newArrayList(); - for (PCollection<?> coll : taggedCollections) { - outputCoders.add(coll.getCoder()); - } - return UnionCoder.of(outputCoders); - } - } + // -------------------------------------------------------------------------------------------- + // Transform Translator Registry + // -------------------------------------------------------------------------------------------- + + @SuppressWarnings("rawtypes") + private static final Map<Class<? extends PTransform>, FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new HashMap<>(); + + // here you can find all the available translators. + static { + TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator()); + TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator()); + TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator()); + TRANSLATORS.put(TextIO.Write.Bound.class, new TextIOWriteBoundStreamingTranslator()); + TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator()); + TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator()); + TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator()); + TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new FlattenPCollectionTranslator()); + TRANSLATORS.put(ParDo.BoundMulti.class, new ParDoBoundMultiStreamingTranslator()); + } + + public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> getTranslator(PTransform<?, ?> transform) { + return TRANSLATORS.get(transform.getClass()); + } + + // -------------------------------------------------------------------------------------------- + // Transformation Implementations + // -------------------------------------------------------------------------------------------- + + private static class CreateStreamingTranslator<OUT> implements + FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> { + + @Override + public void translateNode(Create.Values<OUT> transform, FlinkStreamingTranslationContext context) { + PCollection<OUT> output = context.getOutput(transform); + Iterable<OUT> elements = transform.getElements(); + + // we need to serialize the elements to byte arrays, since they might contain + // elements that are not serializable by Java serialization. We deserialize them + // in the FlatMap function using the Coder. + + List<byte[]> serializedElements = Lists.newArrayList(); + Coder<OUT> elementCoder = context.getOutput(transform).getCoder(); + for (OUT element: elements) { + ByteArrayOutputStream bao = new ByteArrayOutputStream(); + try { + elementCoder.encode(element, bao, Coder.Context.OUTER); + serializedElements.add(bao.toByteArray()); + } catch (IOException e) { + throw new RuntimeException("Could not serialize Create elements using Coder: " + e); + } + } + + + DataStream<Integer> initDataSet = context.getExecutionEnvironment().fromElements(1); + + FlinkStreamingCreateFunction<Integer, OUT> createFunction = + new FlinkStreamingCreateFunction<>(serializedElements, elementCoder); + + WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = WindowedValue.getValueOnlyCoder(elementCoder); + TypeInformation<WindowedValue<OUT>> outputType = new CoderTypeInformation<>(windowCoder); + + DataStream<WindowedValue<OUT>> outputDataStream = initDataSet.flatMap(createFunction) + .returns(outputType); + + context.setOutputDataStream(context.getOutput(transform), outputDataStream); + } + } + + + private static class TextIOWriteBoundStreamingTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>> { + private static final Logger LOG = LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class); + + @Override + public void translateNode(TextIO.Write.Bound<T> transform, FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); + + String filenamePrefix = transform.getFilenamePrefix(); + String filenameSuffix = transform.getFilenameSuffix(); + boolean needsValidation = transform.needsValidation(); + int numShards = transform.getNumShards(); + String shardNameTemplate = transform.getShardNameTemplate(); + + // TODO: Implement these. We need Flink support for this. + LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. Is: {}.", needsValidation); + LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. Is: {}.", filenameSuffix); + LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet supported. Is: {}.", shardNameTemplate); + + DataStream<String> dataSink = inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() { + @Override + public void flatMap(WindowedValue<T> value, Collector<String> out) throws Exception { + out.collect(value.getValue().toString()); + } + }); + DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE); + + if (numShards > 0) { + output.setParallelism(numShards); + } + } + } + + private static class UnboundedReadSourceTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> { + + @Override + public void translateNode(Read.Unbounded<T> transform, FlinkStreamingTranslationContext context) { + PCollection<T> output = context.getOutput(transform); + + DataStream<WindowedValue<T>> source; + if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) { + UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) transform.getSource(); + source = context.getExecutionEnvironment() + .addSource(flinkSource.getFlinkSource()) + .flatMap(new FlatMapFunction<String, WindowedValue<String>>() { + @Override + public void flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception { + collector.collect(WindowedValue.<String>of(s, Instant.now(), GlobalWindow.INSTANCE, PaneInfo.NO_FIRING)); + } + }); + } else { + source = context.getExecutionEnvironment() + .addSource(new UnboundedSourceWrapper<>(context.getPipelineOptions(), transform)); + } + context.setOutputDataStream(output, source); + } + } + + private static class ParDoBoundStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, OUT>> { + + @Override + public void translateNode(ParDo.Bound<IN, OUT> transform, FlinkStreamingTranslationContext context) { + PCollection<OUT> output = context.getOutput(transform); + + final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy = + (WindowingStrategy<OUT, ? extends BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = WindowedValue.getFullCoder(output.getCoder(), + windowingStrategy.getWindowFn().windowCoder()); + CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder = + new CoderTypeInformation<>(outputStreamCoder); + + FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundWrapper<>( + context.getPipelineOptions(), windowingStrategy, transform.getFn()); + DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); + SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = inputDataStream.flatMap(doFnWrapper) + .returns(outputWindowedValueCoder); + + context.setOutputDataStream(context.getOutput(transform), outDataStream); + } + } + + public static class WindowBoundTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> { + + @Override + public void translateNode(Window.Bound<T> transform, FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + DataStream<WindowedValue<T>> inputDataStream = context.getInputDataStream(input); + + final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy = + (WindowingStrategy<T, ? extends BoundedWindow>) + context.getOutput(transform).getWindowingStrategy(); + + final WindowFn<T, ? extends BoundedWindow> windowFn = windowingStrategy.getWindowFn(); + + WindowedValue.WindowedValueCoder<T> outputStreamCoder = WindowedValue.getFullCoder( + context.getInput(transform).getCoder(), windowingStrategy.getWindowFn().windowCoder()); + CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder = + new CoderTypeInformation<>(outputStreamCoder); + + final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new FlinkParDoBoundWrapper<>( + context.getPipelineOptions(), windowingStrategy, createWindowAssigner(windowFn)); + + SingleOutputStreamOperator<WindowedValue<T>> windowedStream = + inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder); + context.setOutputDataStream(context.getOutput(transform), windowedStream); + } + + private static <T, W extends BoundedWindow> DoFn<T, T> createWindowAssigner(final WindowFn<T, W> windowFn) { + return new DoFn<T, T>() { + + @Override + public void processElement(final ProcessContext c) throws Exception { + Collection<W> windows = windowFn.assignWindows( + windowFn.new AssignContext() { + @Override + public T element() { + return c.element(); + } + + @Override + public Instant timestamp() { + return c.timestamp(); + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return c.windowingInternals().windows(); + } + }); + + c.windowingInternals().outputWindowedValue( + c.element(), c.timestamp(), windows, c.pane()); + } + }; + } + } + + public static class GroupByKeyTranslator<K, V> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> { + + @Override + public void translateNode(GroupByKey<K, V> transform, FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + + DataStream<WindowedValue<KV<K, V>>> inputDataStream = context.getInputDataStream(input); + KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) context.getInput(transform).getCoder(); + + KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = FlinkGroupByKeyWrapper + .groupStreamByKey(inputDataStream, inputKvCoder); + + DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream = + FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(), + context.getInput(transform), groupByKStream); + + context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream); + } + } + + public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, VIN, VOUT>> { + + @Override + public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, FlinkStreamingTranslationContext context) { + PValue input = context.getInput(transform); + + DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = context.getInputDataStream(input); + KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) context.getInput(transform).getCoder(); + KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) context.getOutput(transform).getCoder(); + + KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = FlinkGroupByKeyWrapper + .groupStreamByKey(inputDataStream, inputKvCoder); + + Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = (Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn(); + DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream = + FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(), + context.getInput(transform), groupByKStream, combineFn, outputKvCoder); + + context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream); + } + } + + public static class FlattenPCollectionTranslator<T> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>> { + + @Override + public void translateNode(Flatten.FlattenPCollectionList<T> transform, FlinkStreamingTranslationContext context) { + List<PCollection<T>> allInputs = context.getInput(transform).getAll(); + DataStream<T> result = null; + for (PCollection<T> collection : allInputs) { + DataStream<T> current = context.getInputDataStream(collection); + result = (result == null) ? current : result.union(current); + } + context.setOutputDataStream(context.getOutput(transform), result); + } + } + + public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, OUT>> { + + private final int MAIN_TAG_INDEX = 0; + + @Override + public void translateNode(ParDo.BoundMulti<IN, OUT> transform, FlinkStreamingTranslationContext context) { + + // we assume that the transformation does not change the windowing strategy. + WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = context.getInput(transform).getWindowingStrategy(); + + Map<TupleTag<?>, PCollection<?>> outputs = context.getOutput(transform).getAll(); + Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels( + transform.getMainOutputTag(), outputs.keySet()); + + UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values()); + WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = WindowedValue.getFullCoder( + intermUnionCoder, windowingStrategy.getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<RawUnionValue>> intermWindowedValueCoder = + new CoderTypeInformation<>(outputStreamCoder); + + FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new FlinkParDoBoundMultiWrapper<>( + context.getPipelineOptions(), windowingStrategy, transform.getFn(), + transform.getMainOutputTag(), tagsToLabels); + + DataStream<WindowedValue<IN>> inputDataStream = context.getInputDataStream(context.getInput(transform)); + SingleOutputStreamOperator<WindowedValue<RawUnionValue>> intermDataStream = + inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder); + + for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) { + final int outputTag = tagsToLabels.get(output.getKey()); + + WindowedValue.WindowedValueCoder<?> coderForTag = WindowedValue.getFullCoder( + output.getValue().getCoder(), + windowingStrategy.getWindowFn().windowCoder()); + + CoderTypeInformation<WindowedValue<?>> windowedValueCoder = + new CoderTypeInformation(coderForTag); + + context.setOutputDataStream(output.getValue(), + intermDataStream.filter(new FilterFunction<WindowedValue<RawUnionValue>>() { + @Override + public boolean filter(WindowedValue<RawUnionValue> value) throws Exception { + return value.getValue().getUnionTag() == outputTag; + } + }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() { + @Override + public void flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> collector) throws Exception { + collector.collect(WindowedValue.of( + value.getValue().getValue(), + value.getTimestamp(), + value.getWindows(), + value.getPane())); + } + }).returns(windowedValueCoder)); + } + } + + private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) { + Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap(); + tagToLabelMap.put(mainTag, MAIN_TAG_INDEX); + int count = MAIN_TAG_INDEX + 1; + for (TupleTag<?> tag : secondaryTags) { + if (!tagToLabelMap.containsKey(tag)) { + tagToLabelMap.put(tag, count++); + } + } + return tagToLabelMap; + } + + private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) { + List<Coder<?>> outputCoders = Lists.newArrayList(); + for (PCollection<?> coll : taggedCollections) { + outputCoders.add(coll.getCoder()); + } + return UnionCoder.of(outputCoders); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java index 7c4ab93..3586d0c 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java @@ -30,58 +30,58 @@ import java.util.Map; public class FlinkStreamingTranslationContext { - private final StreamExecutionEnvironment env; - private final PipelineOptions options; + private final StreamExecutionEnvironment env; + private final PipelineOptions options; - /** - * Keeps a mapping between the output value of the PTransform (in Dataflow) and the - * Flink Operator that produced it, after the translation of the correspondinf PTransform - * to its Flink equivalent. - * */ - private final Map<PValue, DataStream<?>> dataStreams; + /** + * Keeps a mapping between the output value of the PTransform (in Dataflow) and the + * Flink Operator that produced it, after the translation of the correspondinf PTransform + * to its Flink equivalent. + * */ + private final Map<PValue, DataStream<?>> dataStreams; - private AppliedPTransform<?, ?, ?> currentTransform; + private AppliedPTransform<?, ?, ?> currentTransform; - public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { - this.env = Preconditions.checkNotNull(env); - this.options = Preconditions.checkNotNull(options); - this.dataStreams = new HashMap<>(); - } + public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, PipelineOptions options) { + this.env = Preconditions.checkNotNull(env); + this.options = Preconditions.checkNotNull(options); + this.dataStreams = new HashMap<>(); + } - public StreamExecutionEnvironment getExecutionEnvironment() { - return env; - } + public StreamExecutionEnvironment getExecutionEnvironment() { + return env; + } - public PipelineOptions getPipelineOptions() { - return options; - } + public PipelineOptions getPipelineOptions() { + return options; + } - @SuppressWarnings("unchecked") - public <T> DataStream<T> getInputDataStream(PValue value) { - return (DataStream<T>) dataStreams.get(value); - } + @SuppressWarnings("unchecked") + public <T> DataStream<T> getInputDataStream(PValue value) { + return (DataStream<T>) dataStreams.get(value); + } - public void setOutputDataStream(PValue value, DataStream<?> set) { - if (!dataStreams.containsKey(value)) { - dataStreams.put(value, set); - } - } + public void setOutputDataStream(PValue value, DataStream<?> set) { + if (!dataStreams.containsKey(value)) { + dataStreams.put(value, set); + } + } - /** - * Sets the AppliedPTransform which carries input/output. - * @param currentTransform - */ - public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { - this.currentTransform = currentTransform; - } + /** + * Sets the AppliedPTransform which carries input/output. + * @param currentTransform + */ + public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) { + this.currentTransform = currentTransform; + } - @SuppressWarnings("unchecked") - public <I extends PInput> I getInput(PTransform<I, ?> transform) { - return (I) currentTransform.getInput(); - } + @SuppressWarnings("unchecked") + public <I extends PInput> I getInput(PTransform<I, ?> transform) { + return (I) currentTransform.getInput(); + } - @SuppressWarnings("unchecked") - public <O extends POutput> O getOutput(PTransform<?, O> transform) { - return (O) currentTransform.getOutput(); - } + @SuppressWarnings("unchecked") + public <O extends POutput> O getOutput(PTransform<?, O> transform) { + return (O) currentTransform.getOutput(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java index 4c7fefd..5897473 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java @@ -29,30 +29,30 @@ import java.util.List; public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{ - private CoGbkResultSchema schema; - private TupleTag<?> tupleTag1; - private TupleTag<?> tupleTag2; + private CoGbkResultSchema schema; + private TupleTag<?> tupleTag1; + private TupleTag<?> tupleTag2; - public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) { - this.schema = schema; - this.tupleTag1 = tupleTag1; - this.tupleTag2 = tupleTag2; - } + public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) { + this.schema = schema; + this.tupleTag1 = tupleTag1; + this.tupleTag2 = tupleTag2; + } - @Override - public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception { - K k = null; - List<RawUnionValue> result = new ArrayList<>(); - int index1 = schema.getIndex(tupleTag1); - for (KV<K,?> entry : first) { - k = entry.getKey(); - result.add(new RawUnionValue(index1, entry.getValue())); - } - int index2 = schema.getIndex(tupleTag2); - for (KV<K,?> entry : second) { - k = entry.getKey(); - result.add(new RawUnionValue(index2, entry.getValue())); - } - out.collect(KV.of(k, new CoGbkResult(schema, result))); - } + @Override + public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, Collector<KV<K, CoGbkResult>> out) throws Exception { + K k = null; + List<RawUnionValue> result = new ArrayList<>(); + int index1 = schema.getIndex(tupleTag1); + for (KV<K,?> entry : first) { + k = entry.getKey(); + result.add(new RawUnionValue(index1, entry.getValue())); + } + int index2 = schema.getIndex(tupleTag2); + for (KV<K,?> entry : second) { + k = entry.getKey(); + result.add(new RawUnionValue(index2, entry.getValue())); + } + out.collect(KV.of(k, new CoGbkResult(schema, result))); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java index 21ecaf0..03f2b06 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java @@ -32,29 +32,29 @@ import java.util.List; */ public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> { - private final List<byte[]> elements; - private final Coder<OUT> coder; - - public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) { - this.elements = elements; - this.coder = coder; - } - - @Override - @SuppressWarnings("unchecked") - public void flatMap(IN value, Collector<OUT> out) throws Exception { - - for (byte[] element : elements) { - ByteArrayInputStream bai = new ByteArrayInputStream(element); - OUT outValue = coder.decode(bai, Coder.Context.OUTER); - if (outValue == null) { - // TODO Flink doesn't allow null values in records - out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE); - } else { - out.collect(outValue); - } - } - - out.close(); - } + private final List<byte[]> elements; + private final Coder<OUT> coder; + + public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) { + this.elements = elements; + this.coder = coder; + } + + @Override + @SuppressWarnings("unchecked") + public void flatMap(IN value, Collector<OUT> out) throws Exception { + + for (byte[] element : elements) { + ByteArrayInputStream bai = new ByteArrayInputStream(element); + OUT outValue = coder.decode(bai, Coder.Context.OUTER); + if (outValue == null) { + // TODO Flink doesn't allow null values in records + out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE); + } else { + out.collect(outValue); + } + } + + out.close(); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java index 9c57d4e..53ff1cf 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java @@ -49,154 +49,154 @@ import java.util.List; */ public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, OUT> { - private final DoFn<IN, OUT> doFn; - private transient PipelineOptions options; - - public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) { - this.doFn = doFn; - this.options = options; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } - - private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { - - IN inValue; - Collector<OUT> outCollector; - - public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) { - fn.super(); - super.setupDelegateAggregators(); - this.outCollector = outCollector; - } - - @Override - public IN element() { - return this.inValue; - } - - - @Override - public Instant timestamp() { - return Instant.now(); - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public WindowingInternals<IN, OUT> windowingInternals() { - return new WindowingInternals<IN, OUT>() { - @Override - public StateInternals stateInternals() { - return null; - } - - @Override - public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { - - } - - @Override - public TimerInternals timerInternals() { - return null; - } - - @Override - public Collection<? extends BoundedWindow> windows() { - return ImmutableList.of(GlobalWindow.INSTANCE); - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { - } - - @Override - public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { - throw new RuntimeException("sideInput() not implemented."); - } - }; - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId()); - List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); - } - return view.fromIterableInternal(windowedValueList); - } - - @Override - public void output(OUT output) { - outCollector.collect(output); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - public <T> void sideOutput(TupleTag<T> tag, T output) { - // ignore the side output, this can happen when a user does not register - // side outputs but then outputs using a freshly created TupleTag. - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return wrapper; - } - - - } + private final DoFn<IN, OUT> doFn; + private transient PipelineOptions options; + + public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) { + this.doFn = doFn; + this.options = options; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, options); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + options = mapper.readValue(in, PipelineOptions.class); + } + + @Override + public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws Exception { + ProcessContext context = new ProcessContext(doFn, out); + this.doFn.startBundle(context); + for (IN value : values) { + context.inValue = value; + doFn.processElement(context); + } + this.doFn.finishBundle(context); + } + + private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { + + IN inValue; + Collector<OUT> outCollector; + + public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) { + fn.super(); + super.setupDelegateAggregators(); + this.outCollector = outCollector; + } + + @Override + public IN element() { + return this.inValue; + } + + + @Override + public Instant timestamp() { + return Instant.now(); + } + + @Override + public BoundedWindow window() { + return GlobalWindow.INSTANCE; + } + + @Override + public PaneInfo pane() { + return PaneInfo.NO_FIRING; + } + + @Override + public WindowingInternals<IN, OUT> windowingInternals() { + return new WindowingInternals<IN, OUT>() { + @Override + public StateInternals stateInternals() { + return null; + } + + @Override + public void outputWindowedValue(OUT output, Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) { + + } + + @Override + public TimerInternals timerInternals() { + return null; + } + + @Override + public Collection<? extends BoundedWindow> windows() { + return ImmutableList.of(GlobalWindow.INSTANCE); + } + + @Override + public PaneInfo pane() { + return PaneInfo.NO_FIRING; + } + + @Override + public <T> void writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException { + } + + @Override + public <T> T sideInput(PCollectionView<T> view, BoundedWindow mainInputWindow) { + throw new RuntimeException("sideInput() not implemented."); + } + }; + } + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId()); + List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); + for (T input : sideInput) { + windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); + } + return view.fromIterableInternal(windowedValueList); + } + + @Override + public void output(OUT output) { + outCollector.collect(output); + } + + @Override + public void outputWithTimestamp(OUT output, Instant timestamp) { + // not FLink's way, just output normally + output(output); + } + + @Override + public <T> void sideOutput(TupleTag<T> tag, T output) { + // ignore the side output, this can happen when a user does not register + // side outputs but then outputs using a freshly created TupleTag. + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, wrapper); + return wrapper; + } + + + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java index 5d3702a..0116972 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java @@ -29,47 +29,47 @@ import java.util.Iterator; */ public class FlinkKeyedListAggregationFunction<K,V> implements GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> { - @Override - public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> out) throws Exception { - Iterator<KV<K, V>> it = values.iterator(); - KV<K, V> first = it.next(); - Iterable<V> passThrough = new PassThroughIterable<>(first, it); - out.collect(KV.of(first.getKey(), passThrough)); - } + @Override + public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> out) throws Exception { + Iterator<KV<K, V>> it = values.iterator(); + KV<K, V> first = it.next(); + Iterable<V> passThrough = new PassThroughIterable<>(first, it); + out.collect(KV.of(first.getKey(), passThrough)); + } - private static class PassThroughIterable<K, V> implements Iterable<V>, Iterator<V> { - private KV<K, V> first; - private Iterator<KV<K, V>> iterator; + private static class PassThroughIterable<K, V> implements Iterable<V>, Iterator<V> { + private KV<K, V> first; + private Iterator<KV<K, V>> iterator; - public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) { - this.first = first; - this.iterator = iterator; - } + public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) { + this.first = first; + this.iterator = iterator; + } - @Override - public Iterator<V> iterator() { - return this; - } + @Override + public Iterator<V> iterator() { + return this; + } - @Override - public boolean hasNext() { - return first != null || iterator.hasNext(); - } + @Override + public boolean hasNext() { + return first != null || iterator.hasNext(); + } - @Override - public V next() { - if (first != null) { - V result = first.getValue(); - first = null; - return result; - } else { - return iterator.next().getValue(); - } - } + @Override + public V next() { + if (first != null) { + V result = first.getValue(); + first = null; + return result; + } else { + return iterator.next().getValue(); + } + } - @Override - public void remove() { - throw new UnsupportedOperationException("Cannot remove elements from input."); - } - } + @Override + public void remove() { + throw new UnsupportedOperationException("Cannot remove elements from input."); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java index 6187182..9e51638 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java @@ -51,125 +51,125 @@ import java.util.Map; */ public class FlinkMultiOutputDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, RawUnionValue> { - private final DoFn<IN, OUT> doFn; - private transient PipelineOptions options; - private final Map<TupleTag<?>, Integer> outputMap; - - public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) { - this.doFn = doFn; - this.options = options; - this.outputMap = outputMap; - } - - private void writeObject(ObjectOutputStream out) - throws IOException, ClassNotFoundException { - out.defaultWriteObject(); - ObjectMapper mapper = new ObjectMapper(); - mapper.writeValue(out, options); - } - - private void readObject(ObjectInputStream in) - throws IOException, ClassNotFoundException { - in.defaultReadObject(); - ObjectMapper mapper = new ObjectMapper(); - options = mapper.readValue(in, PipelineOptions.class); - - } - - @Override - public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception { - ProcessContext context = new ProcessContext(doFn, out); - this.doFn.startBundle(context); - for (IN value : values) { - context.inValue = value; - doFn.processElement(context); - } - this.doFn.finishBundle(context); - } - - private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { - - IN inValue; - Collector<RawUnionValue> outCollector; - - public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> outCollector) { - fn.super(); - this.outCollector = outCollector; - } - - @Override - public IN element() { - return this.inValue; - } - - @Override - public Instant timestamp() { - return Instant.now(); - } - - @Override - public BoundedWindow window() { - return GlobalWindow.INSTANCE; - } - - @Override - public PaneInfo pane() { - return PaneInfo.NO_FIRING; - } - - @Override - public WindowingInternals<IN, OUT> windowingInternals() { - return null; - } - - @Override - public PipelineOptions getPipelineOptions() { - return options; - } - - @Override - public <T> T sideInput(PCollectionView<T> view) { - List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal() - .getId()); - List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); - for (T input : sideInput) { - windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); - } - return view.fromIterableInternal(windowedValueList); - } - - @Override - public void output(OUT value) { - // assume that index 0 is the default output - outCollector.collect(new RawUnionValue(0, value)); - } - - @Override - public void outputWithTimestamp(OUT output, Instant timestamp) { - // not FLink's way, just output normally - output(output); - } - - @Override - @SuppressWarnings("unchecked") - public <T> void sideOutput(TupleTag<T> tag, T value) { - Integer index = outputMap.get(tag); - if (index != null) { - outCollector.collect(new RawUnionValue(index, value)); - } - } - - @Override - public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { - sideOutput(tag, output); - } - - @Override - protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { - SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); - getRuntimeContext().addAccumulator(name, wrapper); - return null; - } - - } + private final DoFn<IN, OUT> doFn; + private transient PipelineOptions options; + private final Map<TupleTag<?>, Integer> outputMap; + + public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options, Map<TupleTag<?>, Integer> outputMap) { + this.doFn = doFn; + this.options = options; + this.outputMap = outputMap; + } + + private void writeObject(ObjectOutputStream out) + throws IOException, ClassNotFoundException { + out.defaultWriteObject(); + ObjectMapper mapper = new ObjectMapper(); + mapper.writeValue(out, options); + } + + private void readObject(ObjectInputStream in) + throws IOException, ClassNotFoundException { + in.defaultReadObject(); + ObjectMapper mapper = new ObjectMapper(); + options = mapper.readValue(in, PipelineOptions.class); + + } + + @Override + public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) throws Exception { + ProcessContext context = new ProcessContext(doFn, out); + this.doFn.startBundle(context); + for (IN value : values) { + context.inValue = value; + doFn.processElement(context); + } + this.doFn.finishBundle(context); + } + + private class ProcessContext extends DoFn<IN, OUT>.ProcessContext { + + IN inValue; + Collector<RawUnionValue> outCollector; + + public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> outCollector) { + fn.super(); + this.outCollector = outCollector; + } + + @Override + public IN element() { + return this.inValue; + } + + @Override + public Instant timestamp() { + return Instant.now(); + } + + @Override + public BoundedWindow window() { + return GlobalWindow.INSTANCE; + } + + @Override + public PaneInfo pane() { + return PaneInfo.NO_FIRING; + } + + @Override + public WindowingInternals<IN, OUT> windowingInternals() { + return null; + } + + @Override + public PipelineOptions getPipelineOptions() { + return options; + } + + @Override + public <T> T sideInput(PCollectionView<T> view) { + List<T> sideInput = getRuntimeContext().getBroadcastVariable(view.getTagInternal() + .getId()); + List<WindowedValue<?>> windowedValueList = new ArrayList<>(sideInput.size()); + for (T input : sideInput) { + windowedValueList.add(WindowedValue.of(input, Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane())); + } + return view.fromIterableInternal(windowedValueList); + } + + @Override + public void output(OUT value) { + // assume that index 0 is the default output + outCollector.collect(new RawUnionValue(0, value)); + } + + @Override + public void outputWithTimestamp(OUT output, Instant timestamp) { + // not FLink's way, just output normally + output(output); + } + + @Override + @SuppressWarnings("unchecked") + public <T> void sideOutput(TupleTag<T> tag, T value) { + Integer index = outputMap.get(tag); + if (index != null) { + outCollector.collect(new RawUnionValue(index, value)); + } + } + + @Override + public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant timestamp) { + sideOutput(tag, output); + } + + @Override + protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, AggOutputT> combiner) { + SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new SerializableFnAggregatorWrapper<>(combiner); + getRuntimeContext().addAccumulator(name, wrapper); + return null; + } + + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java index 6792b23..e883d42 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java @@ -25,17 +25,17 @@ import org.apache.flink.util.Collector; */ public class FlinkMultiOutputPruningFunction<T> implements FlatMapFunction<RawUnionValue, T> { - private final int outputTag; + private final int outputTag; - public FlinkMultiOutputPruningFunction(int outputTag) { - this.outputTag = outputTag; - } + public FlinkMultiOutputPruningFunction(int outputTag) { + this.outputTag = outputTag; + } - @Override - @SuppressWarnings("unchecked") - public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) throws Exception { - if (rawUnionValue.getUnionTag() == outputTag) { - collector.collect((T) rawUnionValue.getValue()); - } - } + @Override + @SuppressWarnings("unchecked") + public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) throws Exception { + if (rawUnionValue.getUnionTag() == outputTag) { + collector.collect((T) rawUnionValue.getValue()); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java index ef47b72..1ff9aff 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java @@ -32,29 +32,29 @@ import java.util.Iterator; */ public class FlinkPartialReduceFunction<K, VI, VA> implements GroupCombineFunction<KV<K, VI>, KV<K, VA>> { - private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn; - - public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?> - keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } - - @Override - public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) throws Exception { - - final Iterator<KV<K, VI>> iterator = elements.iterator(); - // create accumulator using the first elements key - KV<K, VI> first = iterator.next(); - K key = first.getKey(); - VI value = first.getValue(); - VA accumulator = keyedCombineFn.createAccumulator(key); - accumulator = keyedCombineFn.addInput(key, accumulator, value); - - while(iterator.hasNext()) { - value = iterator.next().getValue(); - accumulator = keyedCombineFn.addInput(key, accumulator, value); - } - - out.collect(KV.of(key, accumulator)); - } + private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn; + + public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?> + keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + } + + @Override + public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) throws Exception { + + final Iterator<KV<K, VI>> iterator = elements.iterator(); + // create accumulator using the first elements key + KV<K, VI> first = iterator.next(); + K key = first.getKey(); + VI value = first.getValue(); + VA accumulator = keyedCombineFn.createAccumulator(key); + accumulator = keyedCombineFn.addInput(key, accumulator, value); + + while(iterator.hasNext()) { + value = iterator.next().getValue(); + accumulator = keyedCombineFn.addInput(key, accumulator, value); + } + + out.collect(KV.of(key, accumulator)); + } } http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java ---------------------------------------------------------------------- diff --git a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java index cd0b38c..94676a2 100644 --- a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java +++ b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java @@ -33,25 +33,25 @@ import java.util.Iterator; */ public class FlinkReduceFunction<K, VA, VO> implements GroupReduceFunction<KV<K, VA>, KV<K, VO>> { - private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn; + private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn; - public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) { - this.keyedCombineFn = keyedCombineFn; - } + public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn) { + this.keyedCombineFn = keyedCombineFn; + } - @Override - public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception { - Iterator<KV<K, VA>> it = values.iterator(); + @Override + public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) throws Exception { + Iterator<KV<K, VA>> it = values.iterator(); - KV<K, VA> current = it.next(); - K k = current.getKey(); - VA accumulator = current.getValue(); + KV<K, VA> current = it.next(); + K k = current.getKey(); + VA accumulator = current.getValue(); - while (it.hasNext()) { - current = it.next(); - keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) ); - } + while (it.hasNext()) { + current = it.next(); + keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, current.getValue()) ); + } - out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator))); - } + out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator))); + } }