http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
index 1072fa3..ec1d6c8 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkBatchTranslationContext.java
@@ -38,92 +38,92 @@ import java.util.HashMap;
 import java.util.Map;
 
 public class FlinkBatchTranslationContext {
-       
-       private final Map<PValue, DataSet<?>> dataSets;
-       private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
+  
+  private final Map<PValue, DataSet<?>> dataSets;
+  private final Map<PCollectionView<?>, DataSet<?>> broadcastDataSets;
 
-       private final ExecutionEnvironment env;
-       private final PipelineOptions options;
+  private final ExecutionEnvironment env;
+  private final PipelineOptions options;
 
-       private AppliedPTransform<?, ?, ?> currentTransform;
-       
-       // 
------------------------------------------------------------------------
-       
-       public FlinkBatchTranslationContext(ExecutionEnvironment env, 
PipelineOptions options) {
-               this.env = env;
-               this.options = options;
-               this.dataSets = new HashMap<>();
-               this.broadcastDataSets = new HashMap<>();
-       }
-       
-       // 
------------------------------------------------------------------------
-       
-       public ExecutionEnvironment getExecutionEnvironment() {
-               return env;
-       }
+  private AppliedPTransform<?, ?, ?> currentTransform;
+  
+  // ------------------------------------------------------------------------
+  
+  public FlinkBatchTranslationContext(ExecutionEnvironment env, 
PipelineOptions options) {
+    this.env = env;
+    this.options = options;
+    this.dataSets = new HashMap<>();
+    this.broadcastDataSets = new HashMap<>();
+  }
+  
+  // ------------------------------------------------------------------------
+  
+  public ExecutionEnvironment getExecutionEnvironment() {
+    return env;
+  }
 
-       public PipelineOptions getPipelineOptions() {
-               return options;
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T> DataSet<T> getInputDataSet(PValue value) {
-               return (DataSet<T>) dataSets.get(value);
-       }
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
+  
+  @SuppressWarnings("unchecked")
+  public <T> DataSet<T> getInputDataSet(PValue value) {
+    return (DataSet<T>) dataSets.get(value);
+  }
 
-       public void setOutputDataSet(PValue value, DataSet<?> set) {
-               if (!dataSets.containsKey(value)) {
-                       dataSets.put(value, set);
-               }
-       }
+  public void setOutputDataSet(PValue value, DataSet<?> set) {
+    if (!dataSets.containsKey(value)) {
+      dataSets.put(value, set);
+    }
+  }
 
-       /**
-        * Sets the AppliedPTransform which carries input/output.
-        * @param currentTransform
-        */
-       public void setCurrentTransform(AppliedPTransform<?, ?, ?> 
currentTransform) {
-               this.currentTransform = currentTransform;
-       }
+  /**
+   * Sets the AppliedPTransform which carries input/output.
+   * @param currentTransform
+   */
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
+    this.currentTransform = currentTransform;
+  }
 
-       @SuppressWarnings("unchecked")
-       public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
-               return (DataSet<T>) broadcastDataSets.get(value);
-       }
+  @SuppressWarnings("unchecked")
+  public <T> DataSet<T> getSideInputDataSet(PCollectionView<?> value) {
+    return (DataSet<T>) broadcastDataSets.get(value);
+  }
 
-       public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> 
set) {
-               if (!broadcastDataSets.containsKey(value)) {
-                       broadcastDataSets.put(value, set);
-               }
-       }
-       
-       @SuppressWarnings("unchecked")
-       public <T> TypeInformation<T> getTypeInfo(PInput output) {
-               if (output instanceof TypedPValue) {
-                       Coder<?> outputCoder = ((TypedPValue) 
output).getCoder();
-                       if (outputCoder instanceof KvCoder) {
-                               return new KvCoderTypeInformation((KvCoder) 
outputCoder);
-                       } else {
-                               return new CoderTypeInformation(outputCoder);
-                       }
-               }
-               return new GenericTypeInfo<>((Class<T>)Object.class);
-       }
+  public void setSideInputDataSet(PCollectionView<?> value, DataSet<?> set) {
+    if (!broadcastDataSets.containsKey(value)) {
+      broadcastDataSets.put(value, set);
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  public <T> TypeInformation<T> getTypeInfo(PInput output) {
+    if (output instanceof TypedPValue) {
+      Coder<?> outputCoder = ((TypedPValue) output).getCoder();
+      if (outputCoder instanceof KvCoder) {
+        return new KvCoderTypeInformation((KvCoder) outputCoder);
+      } else {
+        return new CoderTypeInformation(outputCoder);
+      }
+    }
+    return new GenericTypeInfo<>((Class<T>)Object.class);
+  }
 
-       public <T> TypeInformation<T> getInputTypeInfo() {
-               return getTypeInfo(currentTransform.getInput());
-       }
+  public <T> TypeInformation<T> getInputTypeInfo() {
+    return getTypeInfo(currentTransform.getInput());
+  }
 
-       public <T> TypeInformation<T> getOutputTypeInfo() {
-               return getTypeInfo((PValue) currentTransform.getOutput());
-       }
+  public <T> TypeInformation<T> getOutputTypeInfo() {
+    return getTypeInfo((PValue) currentTransform.getOutput());
+  }
 
-       @SuppressWarnings("unchecked")
-       <I extends PInput> I getInput(PTransform<I, ?> transform) {
-               return (I) currentTransform.getInput();
-       }
+  @SuppressWarnings("unchecked")
+  <I extends PInput> I getInput(PTransform<I, ?> transform) {
+    return (I) currentTransform.getInput();
+  }
 
-       @SuppressWarnings("unchecked")
-       <O extends POutput> O getOutput(PTransform<?, O> transform) {
-               return (O) currentTransform.getOutput();
-       }
+  @SuppressWarnings("unchecked")
+  <O extends POutput> O getOutput(PTransform<?, O> transform) {
+    return (O) currentTransform.getOutput();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
index b56fe07..a6a333b 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkPipelineTranslator.java
@@ -28,7 +28,7 @@ import com.google.cloud.dataflow.sdk.Pipeline;
  */
 public abstract class FlinkPipelineTranslator implements 
Pipeline.PipelineVisitor {
 
-       public void translate(Pipeline pipeline) {
-               pipeline.traverseTopologically(this);
-       }
+  public void translate(Pipeline pipeline) {
+    pipeline.traverseTopologically(this);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
index ea9ed14..897303d 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingPipelineTranslator.java
@@ -31,113 +31,113 @@ import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
  * */
 public class FlinkStreamingPipelineTranslator extends FlinkPipelineTranslator {
 
-       /** The necessary context in the case of a straming job. */
-       private final FlinkStreamingTranslationContext streamingContext;
-
-       private int depth = 0;
-
-       /** Composite transform that we want to translate before proceeding 
with other transforms. */
-       private PTransform<?, ?> currentCompositeTransform;
-
-       public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, 
PipelineOptions options) {
-               this.streamingContext = new 
FlinkStreamingTranslationContext(env, options);
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Pipeline Visitor Methods
-       // 
--------------------------------------------------------------------------------------------
-
-       @Override
-       public void enterCompositeTransform(TransformTreeNode node) {
-               System.out.println(genSpaces(this.depth) + 
"enterCompositeTransform- " + formatNodeName(node));
-
-               PTransform<?, ?> transform = node.getTransform();
-               if (transform != null && currentCompositeTransform == null) {
-
-                       StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-                       if (translator != null) {
-                               currentCompositeTransform = transform;
-                       }
-               }
-               this.depth++;
-       }
-
-       @Override
-       public void leaveCompositeTransform(TransformTreeNode node) {
-               PTransform<?, ?> transform = node.getTransform();
-               if (transform != null && currentCompositeTransform == 
transform) {
-
-                       StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-                       if (translator != null) {
-                               System.out.println(genSpaces(this.depth) + 
"doingCompositeTransform- " + formatNodeName(node));
-                               applyStreamingTransform(transform, node, 
translator);
-                               currentCompositeTransform = null;
-                       } else {
-                               throw new IllegalStateException("Attempted to 
translate composite transform " +
-                                               "but no translator was found: " 
+ currentCompositeTransform);
-                       }
-               }
-               this.depth--;
-               System.out.println(genSpaces(this.depth) + 
"leaveCompositeTransform- " + formatNodeName(node));
-       }
-
-       @Override
-       public void visitTransform(TransformTreeNode node) {
-               System.out.println(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
-               if (currentCompositeTransform != null) {
-                       // ignore it
-                       return;
-               }
-
-               // get the transformation corresponding to hte node we are
-               // currently visiting and translate it into its Flink 
alternative.
-
-               PTransform<?, ?> transform = node.getTransform();
-               StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
-               if (translator == null) {
-                       System.out.println(node.getTransform().getClass());
-                       throw new UnsupportedOperationException("The transform 
" + transform + " is currently not supported.");
-               }
-               applyStreamingTransform(transform, node, translator);
-       }
-
-       @Override
-       public void visitValue(PValue value, TransformTreeNode producer) {
-               // do nothing here
-       }
-
-       private <T extends PTransform<?, ?>> void 
applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, 
StreamTransformTranslator<?> translator) {
-
-               @SuppressWarnings("unchecked")
-               T typedTransform = (T) transform;
-
-               @SuppressWarnings("unchecked")
-               StreamTransformTranslator<T> typedTranslator = 
(StreamTransformTranslator<T>) translator;
-
-               // create the applied PTransform on the streamingContext
-               streamingContext.setCurrentTransform(AppliedPTransform.of(
-                               node.getFullName(), node.getInput(), 
node.getOutput(), (PTransform) transform));
-               typedTranslator.translateNode(typedTransform, streamingContext);
-       }
-
-       /**
-        * The interface that every Flink translator of a Beam operator should 
implement.
-        * This interface is for <b>streaming</b> jobs. For examples of such 
translators see
-        * {@link FlinkStreamingTransformTranslators}.
-        */
-       public interface StreamTransformTranslator<Type extends PTransform> {
-               void translateNode(Type transform, 
FlinkStreamingTranslationContext context);
-       }
-
-       private static String genSpaces(int n) {
-               String s = "";
-               for (int i = 0; i < n; i++) {
-                       s += "|   ";
-               }
-               return s;
-       }
-
-       private static String formatNodeName(TransformTreeNode node) {
-               return node.toString().split("@")[1] + node.getTransform();
-       }
+  /** The necessary context in the case of a straming job. */
+  private final FlinkStreamingTranslationContext streamingContext;
+
+  private int depth = 0;
+
+  /** Composite transform that we want to translate before proceeding with 
other transforms. */
+  private PTransform<?, ?> currentCompositeTransform;
+
+  public FlinkStreamingPipelineTranslator(StreamExecutionEnvironment env, 
PipelineOptions options) {
+    this.streamingContext = new FlinkStreamingTranslationContext(env, options);
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Pipeline Visitor Methods
+  // 
--------------------------------------------------------------------------------------------
+
+  @Override
+  public void enterCompositeTransform(TransformTreeNode node) {
+    System.out.println(genSpaces(this.depth) + "enterCompositeTransform- " + 
formatNodeName(node));
+
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == null) {
+
+      StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        currentCompositeTransform = transform;
+      }
+    }
+    this.depth++;
+  }
+
+  @Override
+  public void leaveCompositeTransform(TransformTreeNode node) {
+    PTransform<?, ?> transform = node.getTransform();
+    if (transform != null && currentCompositeTransform == transform) {
+
+      StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
+      if (translator != null) {
+        System.out.println(genSpaces(this.depth) + "doingCompositeTransform- " 
+ formatNodeName(node));
+        applyStreamingTransform(transform, node, translator);
+        currentCompositeTransform = null;
+      } else {
+        throw new IllegalStateException("Attempted to translate composite 
transform " +
+            "but no translator was found: " + currentCompositeTransform);
+      }
+    }
+    this.depth--;
+    System.out.println(genSpaces(this.depth) + "leaveCompositeTransform- " + 
formatNodeName(node));
+  }
+
+  @Override
+  public void visitTransform(TransformTreeNode node) {
+    System.out.println(genSpaces(this.depth) + "visitTransform- " + 
formatNodeName(node));
+    if (currentCompositeTransform != null) {
+      // ignore it
+      return;
+    }
+
+    // get the transformation corresponding to hte node we are
+    // currently visiting and translate it into its Flink alternative.
+
+    PTransform<?, ?> transform = node.getTransform();
+    StreamTransformTranslator<?> translator = 
FlinkStreamingTransformTranslators.getTranslator(transform);
+    if (translator == null) {
+      System.out.println(node.getTransform().getClass());
+      throw new UnsupportedOperationException("The transform " + transform + " 
is currently not supported.");
+    }
+    applyStreamingTransform(transform, node, translator);
+  }
+
+  @Override
+  public void visitValue(PValue value, TransformTreeNode producer) {
+    // do nothing here
+  }
+
+  private <T extends PTransform<?, ?>> void 
applyStreamingTransform(PTransform<?, ?> transform, TransformTreeNode node, 
StreamTransformTranslator<?> translator) {
+
+    @SuppressWarnings("unchecked")
+    T typedTransform = (T) transform;
+
+    @SuppressWarnings("unchecked")
+    StreamTransformTranslator<T> typedTranslator = 
(StreamTransformTranslator<T>) translator;
+
+    // create the applied PTransform on the streamingContext
+    streamingContext.setCurrentTransform(AppliedPTransform.of(
+        node.getFullName(), node.getInput(), node.getOutput(), (PTransform) 
transform));
+    typedTranslator.translateNode(typedTransform, streamingContext);
+  }
+
+  /**
+   * The interface that every Flink translator of a Beam operator should 
implement.
+   * This interface is for <b>streaming</b> jobs. For examples of such 
translators see
+   * {@link FlinkStreamingTransformTranslators}.
+   */
+  public interface StreamTransformTranslator<Type extends PTransform> {
+    void translateNode(Type transform, FlinkStreamingTranslationContext 
context);
+  }
+
+  private static String genSpaces(int n) {
+    String s = "";
+    for (int i = 0; i < n; i++) {
+      s += "|   ";
+    }
+    return s;
+  }
+
+  private static String formatNodeName(TransformTreeNode node) {
+    return node.toString().split("@")[1] + node.getTransform();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
index 99dbedb..9fd33be 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTransformTranslators.java
@@ -59,346 +59,346 @@ import java.util.*;
  */
 public class FlinkStreamingTransformTranslators {
 
-       // 
--------------------------------------------------------------------------------------------
-       //  Transform Translator Registry
-       // 
--------------------------------------------------------------------------------------------
-
-       @SuppressWarnings("rawtypes")
-       private static final Map<Class<? extends PTransform>, 
FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new 
HashMap<>();
-
-       // here you can find all the available translators.
-       static {
-               TRANSLATORS.put(Create.Values.class, new 
CreateStreamingTranslator());
-               TRANSLATORS.put(Read.Unbounded.class, new 
UnboundedReadSourceTranslator());
-               TRANSLATORS.put(ParDo.Bound.class, new 
ParDoBoundStreamingTranslator());
-               TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
-               TRANSLATORS.put(Window.Bound.class, new 
WindowBoundTranslator());
-               TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
-               TRANSLATORS.put(Combine.PerKey.class, new 
CombinePerKeyTranslator());
-               TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslator());
-               TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiStreamingTranslator());
-       }
-
-       public static 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
-               return TRANSLATORS.get(transform.getClass());
-       }
-
-       // 
--------------------------------------------------------------------------------------------
-       //  Transformation Implementations
-       // 
--------------------------------------------------------------------------------------------
-
-       private static class CreateStreamingTranslator<OUT> implements
-                       
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
-
-               @Override
-               public void translateNode(Create.Values<OUT> transform, 
FlinkStreamingTranslationContext context) {
-                       PCollection<OUT> output = context.getOutput(transform);
-                       Iterable<OUT> elements = transform.getElements();
-
-                       // we need to serialize the elements to byte arrays, 
since they might contain
-                       // elements that are not serializable by Java 
serialization. We deserialize them
-                       // in the FlatMap function using the Coder.
-
-                       List<byte[]> serializedElements = Lists.newArrayList();
-                       Coder<OUT> elementCoder = 
context.getOutput(transform).getCoder();
-                       for (OUT element: elements) {
-                               ByteArrayOutputStream bao = new 
ByteArrayOutputStream();
-                               try {
-                                       elementCoder.encode(element, bao, 
Coder.Context.OUTER);
-                                       
serializedElements.add(bao.toByteArray());
-                               } catch (IOException e) {
-                                       throw new RuntimeException("Could not 
serialize Create elements using Coder: " + e);
-                               }
-                       }
-
-
-                       DataStream<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
-
-                       FlinkStreamingCreateFunction<Integer, OUT> 
createFunction =
-                                       new 
FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
-
-                       WindowedValue.ValueOnlyWindowedValueCoder<OUT> 
windowCoder = WindowedValue.getValueOnlyCoder(elementCoder);
-                       TypeInformation<WindowedValue<OUT>> outputType = new 
CoderTypeInformation<>(windowCoder);
-
-                       DataStream<WindowedValue<OUT>> outputDataStream = 
initDataSet.flatMap(createFunction)
-                                       .returns(outputType);
-
-                       
context.setOutputDataStream(context.getOutput(transform), outputDataStream);
-               }
-       }
-
-
-       private static class TextIOWriteBoundStreamingTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>>
 {
-               private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
-
-               @Override
-               public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
-                       PValue input = context.getInput(transform);
-                       DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
-
-                       String filenamePrefix = transform.getFilenamePrefix();
-                       String filenameSuffix = transform.getFilenameSuffix();
-                       boolean needsValidation = transform.needsValidation();
-                       int numShards = transform.getNumShards();
-                       String shardNameTemplate = 
transform.getShardNameTemplate();
-
-                       // TODO: Implement these. We need Flink support for 
this.
-                       LOG.warn("Translation of TextIO.Write.needsValidation 
not yet supported. Is: {}.", needsValidation);
-                       LOG.warn("Translation of TextIO.Write.filenameSuffix 
not yet supported. Is: {}.", filenameSuffix);
-                       LOG.warn("Translation of TextIO.Write.shardNameTemplate 
not yet supported. Is: {}.", shardNameTemplate);
-
-                       DataStream<String> dataSink = 
inputDataStream.flatMap(new FlatMapFunction<WindowedValue<T>, String>() {
-                               @Override
-                               public void flatMap(WindowedValue<T> value, 
Collector<String> out) throws Exception {
-                                       
out.collect(value.getValue().toString());
-                               }
-                       });
-                       DataStreamSink<String> output = 
dataSink.writeAsText(filenamePrefix, FileSystem.WriteMode.OVERWRITE);
-
-                       if (numShards > 0) {
-                               output.setParallelism(numShards);
-                       }
-               }
-       }
-
-       private static class UnboundedReadSourceTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
-
-               @Override
-               public void translateNode(Read.Unbounded<T> transform, 
FlinkStreamingTranslationContext context) {
-                       PCollection<T> output = context.getOutput(transform);
-
-                       DataStream<WindowedValue<T>> source;
-                       if 
(transform.getSource().getClass().equals(UnboundedFlinkSource.class)) {
-                               UnboundedFlinkSource flinkSource = 
(UnboundedFlinkSource) transform.getSource();
-                               source = context.getExecutionEnvironment()
-                                               
.addSource(flinkSource.getFlinkSource())
-                                               .flatMap(new 
FlatMapFunction<String, WindowedValue<String>>() {
-                                                       @Override
-                                                       public void 
flatMap(String s, Collector<WindowedValue<String>> collector) throws Exception {
-                                                               
collector.collect(WindowedValue.<String>of(s, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
-                                                       }
-                                               });
-                       } else {
-                               source = context.getExecutionEnvironment()
-                                               .addSource(new 
UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
-                       }
-                       context.setOutputDataStream(output, source);
-               }
-       }
-
-       private static class ParDoBoundStreamingTranslator<IN, OUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, 
OUT>> {
-
-               @Override
-               public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
-                       PCollection<OUT> output = context.getOutput(transform);
-
-                       final WindowingStrategy<OUT, ? extends BoundedWindow> 
windowingStrategy =
-                                       (WindowingStrategy<OUT, ? extends 
BoundedWindow>)
-                                                       
context.getOutput(transform).getWindowingStrategy();
-
-                       WindowedValue.WindowedValueCoder<OUT> outputStreamCoder 
= WindowedValue.getFullCoder(output.getCoder(),
-                                       
windowingStrategy.getWindowFn().windowCoder());
-                       CoderTypeInformation<WindowedValue<OUT>> 
outputWindowedValueCoder =
-                                       new 
CoderTypeInformation<>(outputStreamCoder);
-
-                       FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundWrapper<>(
-                                       context.getPipelineOptions(), 
windowingStrategy, transform.getFn());
-                       DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
-                       SingleOutputStreamOperator<WindowedValue<OUT>> 
outDataStream = inputDataStream.flatMap(doFnWrapper)
-                                       .returns(outputWindowedValueCoder);
-
-                       
context.setOutputDataStream(context.getOutput(transform), outDataStream);
-               }
-       }
-
-       public static class WindowBoundTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
-
-               @Override
-               public void translateNode(Window.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
-                       PValue input = context.getInput(transform);
-                       DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
-
-                       final WindowingStrategy<T, ? extends BoundedWindow> 
windowingStrategy =
-                                       (WindowingStrategy<T, ? extends 
BoundedWindow>)
-                                                       
context.getOutput(transform).getWindowingStrategy();
-
-                       final WindowFn<T, ? extends BoundedWindow> windowFn = 
windowingStrategy.getWindowFn();
-
-                       WindowedValue.WindowedValueCoder<T> outputStreamCoder = 
WindowedValue.getFullCoder(
-                                       context.getInput(transform).getCoder(), 
windowingStrategy.getWindowFn().windowCoder());
-                       CoderTypeInformation<WindowedValue<T>> 
outputWindowedValueCoder =
-                                       new 
CoderTypeInformation<>(outputStreamCoder);
-
-                       final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = 
new FlinkParDoBoundWrapper<>(
-                                       context.getPipelineOptions(), 
windowingStrategy, createWindowAssigner(windowFn));
-
-                       SingleOutputStreamOperator<WindowedValue<T>> 
windowedStream =
-                                       
inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
-                       
context.setOutputDataStream(context.getOutput(transform), windowedStream);
-               }
-
-               private static <T, W extends BoundedWindow> DoFn<T, T> 
createWindowAssigner(final WindowFn<T, W> windowFn) {
-                       return new DoFn<T, T>() {
-
-                               @Override
-                               public void processElement(final ProcessContext 
c) throws Exception {
-                                       Collection<W> windows = 
windowFn.assignWindows(
-                                                       windowFn.new 
AssignContext() {
-                                                               @Override
-                                                               public T 
element() {
-                                                                       return 
c.element();
-                                                               }
-
-                                                               @Override
-                                                               public Instant 
timestamp() {
-                                                                       return 
c.timestamp();
-                                                               }
-
-                                                               @Override
-                                                               public 
Collection<? extends BoundedWindow> windows() {
-                                                                       return 
c.windowingInternals().windows();
-                                                               }
-                                                       });
-
-                                       
c.windowingInternals().outputWindowedValue(
-                                                       c.element(), 
c.timestamp(), windows, c.pane());
-                               }
-                       };
-               }
-       }
-
-       public static class GroupByKeyTranslator<K, V> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
-
-               @Override
-               public void translateNode(GroupByKey<K, V> transform, 
FlinkStreamingTranslationContext context) {
-                       PValue input = context.getInput(transform);
-
-                       DataStream<WindowedValue<KV<K, V>>> inputDataStream = 
context.getInputDataStream(input);
-                       KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
-
-                       KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream 
= FlinkGroupByKeyWrapper
-                                       .groupStreamByKey(inputDataStream, 
inputKvCoder);
-
-                       DataStream<WindowedValue<KV<K, Iterable<V>>>> 
groupedByKNWstream =
-                                       
FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
-                                                       
context.getInput(transform), groupByKStream);
-
-                       
context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
-               }
-       }
-
-       public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> 
implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, 
VIN, VOUT>> {
-
-               @Override
-               public void translateNode(Combine.PerKey<K, VIN, VOUT> 
transform, FlinkStreamingTranslationContext context) {
-                       PValue input = context.getInput(transform);
-
-                       DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = 
context.getInputDataStream(input);
-                       KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) 
context.getInput(transform).getCoder();
-                       KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) 
context.getOutput(transform).getCoder();
-
-                       KeyedStream<WindowedValue<KV<K, VIN>>, K> 
groupByKStream = FlinkGroupByKeyWrapper
-                                       .groupStreamByKey(inputDataStream, 
inputKvCoder);
-
-                       Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = 
(Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
-                       DataStream<WindowedValue<KV<K, VOUT>>> 
groupedByKNWstream =
-                                       
FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
-                                                       
context.getInput(transform), groupByKStream, combineFn, outputKvCoder);
-
-                       
context.setOutputDataStream(context.getOutput(transform), groupedByKNWstream);
-               }
-       }
-
-       public static class FlattenPCollectionTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
-
-               @Override
-               public void translateNode(Flatten.FlattenPCollectionList<T> 
transform, FlinkStreamingTranslationContext context) {
-                       List<PCollection<T>> allInputs = 
context.getInput(transform).getAll();
-                       DataStream<T> result = null;
-                       for (PCollection<T> collection : allInputs) {
-                               DataStream<T> current = 
context.getInputDataStream(collection);
-                               result = (result == null) ? current : 
result.union(current);
-                       }
-                       
context.setOutputDataStream(context.getOutput(transform), result);
-               }
-       }
-
-       public static class ParDoBoundMultiStreamingTranslator<IN, OUT> 
implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
-
-               private final int MAIN_TAG_INDEX = 0;
-
-               @Override
-               public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
-
-                       // we assume that the transformation does not change 
the windowing strategy.
-                       WindowingStrategy<?, ? extends BoundedWindow> 
windowingStrategy = context.getInput(transform).getWindowingStrategy();
-
-                       Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
-                       Map<TupleTag<?>, Integer> tagsToLabels = 
transformTupleTagsToLabels(
-                                       transform.getMainOutputTag(), 
outputs.keySet());
-
-                       UnionCoder intermUnionCoder = 
getIntermUnionCoder(outputs.values());
-                       WindowedValue.WindowedValueCoder<RawUnionValue> 
outputStreamCoder = WindowedValue.getFullCoder(
-                                       intermUnionCoder, 
windowingStrategy.getWindowFn().windowCoder());
-
-                       CoderTypeInformation<WindowedValue<RawUnionValue>> 
intermWindowedValueCoder =
-                                       new 
CoderTypeInformation<>(outputStreamCoder);
-
-                       FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundMultiWrapper<>(
-                                       context.getPipelineOptions(), 
windowingStrategy, transform.getFn(),
-                                       transform.getMainOutputTag(), 
tagsToLabels);
-
-                       DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
-                       
SingleOutputStreamOperator<WindowedValue<RawUnionValue>> intermDataStream =
-                                       
inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
-
-                       for (Map.Entry<TupleTag<?>, PCollection<?>> output : 
outputs.entrySet()) {
-                               final int outputTag = 
tagsToLabels.get(output.getKey());
-
-                               WindowedValue.WindowedValueCoder<?> coderForTag 
= WindowedValue.getFullCoder(
-                                               output.getValue().getCoder(),
-                                               
windowingStrategy.getWindowFn().windowCoder());
-
-                               CoderTypeInformation<WindowedValue<?>> 
windowedValueCoder =
-                                               new 
CoderTypeInformation(coderForTag);
-
-                               context.setOutputDataStream(output.getValue(),
-                                               intermDataStream.filter(new 
FilterFunction<WindowedValue<RawUnionValue>>() {
-                                                       @Override
-                                                       public boolean 
filter(WindowedValue<RawUnionValue> value) throws Exception {
-                                                               return 
value.getValue().getUnionTag() == outputTag;
-                                                       }
-                                               }).flatMap(new 
FlatMapFunction<WindowedValue<RawUnionValue>, WindowedValue<?>>() {
-                                                       @Override
-                                                       public void 
flatMap(WindowedValue<RawUnionValue> value, Collector<WindowedValue<?>> 
collector) throws Exception {
-                                                               
collector.collect(WindowedValue.of(
-                                                                               
value.getValue().getValue(),
-                                                                               
value.getTimestamp(),
-                                                                               
value.getWindows(),
-                                                                               
value.getPane()));
-                                                       }
-                                               }).returns(windowedValueCoder));
-                       }
-               }
-
-               private Map<TupleTag<?>, Integer> 
transformTupleTagsToLabels(TupleTag<?> mainTag, Set<TupleTag<?>> secondaryTags) 
{
-                       Map<TupleTag<?>, Integer> tagToLabelMap = 
Maps.newHashMap();
-                       tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
-                       int count = MAIN_TAG_INDEX + 1;
-                       for (TupleTag<?> tag : secondaryTags) {
-                               if (!tagToLabelMap.containsKey(tag)) {
-                                       tagToLabelMap.put(tag, count++);
-                               }
-                       }
-                       return tagToLabelMap;
-               }
-
-               private UnionCoder 
getIntermUnionCoder(Collection<PCollection<?>> taggedCollections) {
-                       List<Coder<?>> outputCoders = Lists.newArrayList();
-                       for (PCollection<?> coll : taggedCollections) {
-                               outputCoders.add(coll.getCoder());
-                       }
-                       return UnionCoder.of(outputCoders);
-               }
-       }
+  // 
--------------------------------------------------------------------------------------------
+  //  Transform Translator Registry
+  // 
--------------------------------------------------------------------------------------------
+
+  @SuppressWarnings("rawtypes")
+  private static final Map<Class<? extends PTransform>, 
FlinkStreamingPipelineTranslator.StreamTransformTranslator> TRANSLATORS = new 
HashMap<>();
+
+  // here you can find all the available translators.
+  static {
+    TRANSLATORS.put(Create.Values.class, new CreateStreamingTranslator());
+    TRANSLATORS.put(Read.Unbounded.class, new UnboundedReadSourceTranslator());
+    TRANSLATORS.put(ParDo.Bound.class, new ParDoBoundStreamingTranslator());
+    TRANSLATORS.put(TextIO.Write.Bound.class, new 
TextIOWriteBoundStreamingTranslator());
+    TRANSLATORS.put(Window.Bound.class, new WindowBoundTranslator());
+    TRANSLATORS.put(GroupByKey.class, new GroupByKeyTranslator());
+    TRANSLATORS.put(Combine.PerKey.class, new CombinePerKeyTranslator());
+    TRANSLATORS.put(Flatten.FlattenPCollectionList.class, new 
FlattenPCollectionTranslator());
+    TRANSLATORS.put(ParDo.BoundMulti.class, new 
ParDoBoundMultiStreamingTranslator());
+  }
+
+  public static FlinkStreamingPipelineTranslator.StreamTransformTranslator<?> 
getTranslator(PTransform<?, ?> transform) {
+    return TRANSLATORS.get(transform.getClass());
+  }
+
+  // 
--------------------------------------------------------------------------------------------
+  //  Transformation Implementations
+  // 
--------------------------------------------------------------------------------------------
+
+  private static class CreateStreamingTranslator<OUT> implements
+      
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Create.Values<OUT>> {
+
+    @Override
+    public void translateNode(Create.Values<OUT> transform, 
FlinkStreamingTranslationContext context) {
+      PCollection<OUT> output = context.getOutput(transform);
+      Iterable<OUT> elements = transform.getElements();
+
+      // we need to serialize the elements to byte arrays, since they might 
contain
+      // elements that are not serializable by Java serialization. We 
deserialize them
+      // in the FlatMap function using the Coder.
+
+      List<byte[]> serializedElements = Lists.newArrayList();
+      Coder<OUT> elementCoder = context.getOutput(transform).getCoder();
+      for (OUT element: elements) {
+        ByteArrayOutputStream bao = new ByteArrayOutputStream();
+        try {
+          elementCoder.encode(element, bao, Coder.Context.OUTER);
+          serializedElements.add(bao.toByteArray());
+        } catch (IOException e) {
+          throw new RuntimeException("Could not serialize Create elements 
using Coder: " + e);
+        }
+      }
+
+
+      DataStream<Integer> initDataSet = 
context.getExecutionEnvironment().fromElements(1);
+
+      FlinkStreamingCreateFunction<Integer, OUT> createFunction =
+          new FlinkStreamingCreateFunction<>(serializedElements, elementCoder);
+
+      WindowedValue.ValueOnlyWindowedValueCoder<OUT> windowCoder = 
WindowedValue.getValueOnlyCoder(elementCoder);
+      TypeInformation<WindowedValue<OUT>> outputType = new 
CoderTypeInformation<>(windowCoder);
+
+      DataStream<WindowedValue<OUT>> outputDataStream = 
initDataSet.flatMap(createFunction)
+          .returns(outputType);
+
+      context.setOutputDataStream(context.getOutput(transform), 
outputDataStream);
+    }
+  }
+
+
+  private static class TextIOWriteBoundStreamingTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<TextIO.Write.Bound<T>>
 {
+    private static final Logger LOG = 
LoggerFactory.getLogger(TextIOWriteBoundStreamingTranslator.class);
+
+    @Override
+    public void translateNode(TextIO.Write.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
+
+      String filenamePrefix = transform.getFilenamePrefix();
+      String filenameSuffix = transform.getFilenameSuffix();
+      boolean needsValidation = transform.needsValidation();
+      int numShards = transform.getNumShards();
+      String shardNameTemplate = transform.getShardNameTemplate();
+
+      // TODO: Implement these. We need Flink support for this.
+      LOG.warn("Translation of TextIO.Write.needsValidation not yet supported. 
Is: {}.", needsValidation);
+      LOG.warn("Translation of TextIO.Write.filenameSuffix not yet supported. 
Is: {}.", filenameSuffix);
+      LOG.warn("Translation of TextIO.Write.shardNameTemplate not yet 
supported. Is: {}.", shardNameTemplate);
+
+      DataStream<String> dataSink = inputDataStream.flatMap(new 
FlatMapFunction<WindowedValue<T>, String>() {
+        @Override
+        public void flatMap(WindowedValue<T> value, Collector<String> out) 
throws Exception {
+          out.collect(value.getValue().toString());
+        }
+      });
+      DataStreamSink<String> output = dataSink.writeAsText(filenamePrefix, 
FileSystem.WriteMode.OVERWRITE);
+
+      if (numShards > 0) {
+        output.setParallelism(numShards);
+      }
+    }
+  }
+
+  private static class UnboundedReadSourceTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Read.Unbounded<T>> {
+
+    @Override
+    public void translateNode(Read.Unbounded<T> transform, 
FlinkStreamingTranslationContext context) {
+      PCollection<T> output = context.getOutput(transform);
+
+      DataStream<WindowedValue<T>> source;
+      if (transform.getSource().getClass().equals(UnboundedFlinkSource.class)) 
{
+        UnboundedFlinkSource flinkSource = (UnboundedFlinkSource) 
transform.getSource();
+        source = context.getExecutionEnvironment()
+            .addSource(flinkSource.getFlinkSource())
+            .flatMap(new FlatMapFunction<String, WindowedValue<String>>() {
+              @Override
+              public void flatMap(String s, Collector<WindowedValue<String>> 
collector) throws Exception {
+                collector.collect(WindowedValue.<String>of(s, Instant.now(), 
GlobalWindow.INSTANCE, PaneInfo.NO_FIRING));
+              }
+            });
+      } else {
+        source = context.getExecutionEnvironment()
+            .addSource(new 
UnboundedSourceWrapper<>(context.getPipelineOptions(), transform));
+      }
+      context.setOutputDataStream(output, source);
+    }
+  }
+
+  private static class ParDoBoundStreamingTranslator<IN, OUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.Bound<IN, 
OUT>> {
+
+    @Override
+    public void translateNode(ParDo.Bound<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
+      PCollection<OUT> output = context.getOutput(transform);
+
+      final WindowingStrategy<OUT, ? extends BoundedWindow> windowingStrategy =
+          (WindowingStrategy<OUT, ? extends BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      WindowedValue.WindowedValueCoder<OUT> outputStreamCoder = 
WindowedValue.getFullCoder(output.getCoder(),
+          windowingStrategy.getWindowFn().windowCoder());
+      CoderTypeInformation<WindowedValue<OUT>> outputWindowedValueCoder =
+          new CoderTypeInformation<>(outputStreamCoder);
+
+      FlinkParDoBoundWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundWrapper<>(
+          context.getPipelineOptions(), windowingStrategy, transform.getFn());
+      DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
+      SingleOutputStreamOperator<WindowedValue<OUT>> outDataStream = 
inputDataStream.flatMap(doFnWrapper)
+          .returns(outputWindowedValueCoder);
+
+      context.setOutputDataStream(context.getOutput(transform), outDataStream);
+    }
+  }
+
+  public static class WindowBoundTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Window.Bound<T>> {
+
+    @Override
+    public void translateNode(Window.Bound<T> transform, 
FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+      DataStream<WindowedValue<T>> inputDataStream = 
context.getInputDataStream(input);
+
+      final WindowingStrategy<T, ? extends BoundedWindow> windowingStrategy =
+          (WindowingStrategy<T, ? extends BoundedWindow>)
+              context.getOutput(transform).getWindowingStrategy();
+
+      final WindowFn<T, ? extends BoundedWindow> windowFn = 
windowingStrategy.getWindowFn();
+
+      WindowedValue.WindowedValueCoder<T> outputStreamCoder = 
WindowedValue.getFullCoder(
+          context.getInput(transform).getCoder(), 
windowingStrategy.getWindowFn().windowCoder());
+      CoderTypeInformation<WindowedValue<T>> outputWindowedValueCoder =
+          new CoderTypeInformation<>(outputStreamCoder);
+
+      final FlinkParDoBoundWrapper<T, T> windowDoFnAssigner = new 
FlinkParDoBoundWrapper<>(
+          context.getPipelineOptions(), windowingStrategy, 
createWindowAssigner(windowFn));
+
+      SingleOutputStreamOperator<WindowedValue<T>> windowedStream =
+          
inputDataStream.flatMap(windowDoFnAssigner).returns(outputWindowedValueCoder);
+      context.setOutputDataStream(context.getOutput(transform), 
windowedStream);
+    }
+
+    private static <T, W extends BoundedWindow> DoFn<T, T> 
createWindowAssigner(final WindowFn<T, W> windowFn) {
+      return new DoFn<T, T>() {
+
+        @Override
+        public void processElement(final ProcessContext c) throws Exception {
+          Collection<W> windows = windowFn.assignWindows(
+              windowFn.new AssignContext() {
+                @Override
+                public T element() {
+                  return c.element();
+                }
+
+                @Override
+                public Instant timestamp() {
+                  return c.timestamp();
+                }
+
+                @Override
+                public Collection<? extends BoundedWindow> windows() {
+                  return c.windowingInternals().windows();
+                }
+              });
+
+          c.windowingInternals().outputWindowedValue(
+              c.element(), c.timestamp(), windows, c.pane());
+        }
+      };
+    }
+  }
+
+  public static class GroupByKeyTranslator<K, V> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<GroupByKey<K, V>> {
+
+    @Override
+    public void translateNode(GroupByKey<K, V> transform, 
FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+
+      DataStream<WindowedValue<KV<K, V>>> inputDataStream = 
context.getInputDataStream(input);
+      KvCoder<K, V> inputKvCoder = (KvCoder<K, V>) 
context.getInput(transform).getCoder();
+
+      KeyedStream<WindowedValue<KV<K, V>>, K> groupByKStream = 
FlinkGroupByKeyWrapper
+          .groupStreamByKey(inputDataStream, inputKvCoder);
+
+      DataStream<WindowedValue<KV<K, Iterable<V>>>> groupedByKNWstream =
+          
FlinkGroupAlsoByWindowWrapper.createForIterable(context.getPipelineOptions(),
+              context.getInput(transform), groupByKStream);
+
+      context.setOutputDataStream(context.getOutput(transform), 
groupedByKNWstream);
+    }
+  }
+
+  public static class CombinePerKeyTranslator<K, VIN, VACC, VOUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Combine.PerKey<K, 
VIN, VOUT>> {
+
+    @Override
+    public void translateNode(Combine.PerKey<K, VIN, VOUT> transform, 
FlinkStreamingTranslationContext context) {
+      PValue input = context.getInput(transform);
+
+      DataStream<WindowedValue<KV<K, VIN>>> inputDataStream = 
context.getInputDataStream(input);
+      KvCoder<K, VIN> inputKvCoder = (KvCoder<K, VIN>) 
context.getInput(transform).getCoder();
+      KvCoder<K, VOUT> outputKvCoder = (KvCoder<K, VOUT>) 
context.getOutput(transform).getCoder();
+
+      KeyedStream<WindowedValue<KV<K, VIN>>, K> groupByKStream = 
FlinkGroupByKeyWrapper
+          .groupStreamByKey(inputDataStream, inputKvCoder);
+
+      Combine.KeyedCombineFn<K, VIN, VACC, VOUT> combineFn = 
(Combine.KeyedCombineFn<K, VIN, VACC, VOUT>) transform.getFn();
+      DataStream<WindowedValue<KV<K, VOUT>>> groupedByKNWstream =
+          FlinkGroupAlsoByWindowWrapper.create(context.getPipelineOptions(),
+              context.getInput(transform), groupByKStream, combineFn, 
outputKvCoder);
+
+      context.setOutputDataStream(context.getOutput(transform), 
groupedByKNWstream);
+    }
+  }
+
+  public static class FlattenPCollectionTranslator<T> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<Flatten.FlattenPCollectionList<T>>
 {
+
+    @Override
+    public void translateNode(Flatten.FlattenPCollectionList<T> transform, 
FlinkStreamingTranslationContext context) {
+      List<PCollection<T>> allInputs = context.getInput(transform).getAll();
+      DataStream<T> result = null;
+      for (PCollection<T> collection : allInputs) {
+        DataStream<T> current = context.getInputDataStream(collection);
+        result = (result == null) ? current : result.union(current);
+      }
+      context.setOutputDataStream(context.getOutput(transform), result);
+    }
+  }
+
+  public static class ParDoBoundMultiStreamingTranslator<IN, OUT> implements 
FlinkStreamingPipelineTranslator.StreamTransformTranslator<ParDo.BoundMulti<IN, 
OUT>> {
+
+    private final int MAIN_TAG_INDEX = 0;
+
+    @Override
+    public void translateNode(ParDo.BoundMulti<IN, OUT> transform, 
FlinkStreamingTranslationContext context) {
+
+      // we assume that the transformation does not change the windowing 
strategy.
+      WindowingStrategy<?, ? extends BoundedWindow> windowingStrategy = 
context.getInput(transform).getWindowingStrategy();
+
+      Map<TupleTag<?>, PCollection<?>> outputs = 
context.getOutput(transform).getAll();
+      Map<TupleTag<?>, Integer> tagsToLabels = transformTupleTagsToLabels(
+          transform.getMainOutputTag(), outputs.keySet());
+
+      UnionCoder intermUnionCoder = getIntermUnionCoder(outputs.values());
+      WindowedValue.WindowedValueCoder<RawUnionValue> outputStreamCoder = 
WindowedValue.getFullCoder(
+          intermUnionCoder, windowingStrategy.getWindowFn().windowCoder());
+
+      CoderTypeInformation<WindowedValue<RawUnionValue>> 
intermWindowedValueCoder =
+          new CoderTypeInformation<>(outputStreamCoder);
+
+      FlinkParDoBoundMultiWrapper<IN, OUT> doFnWrapper = new 
FlinkParDoBoundMultiWrapper<>(
+          context.getPipelineOptions(), windowingStrategy, transform.getFn(),
+          transform.getMainOutputTag(), tagsToLabels);
+
+      DataStream<WindowedValue<IN>> inputDataStream = 
context.getInputDataStream(context.getInput(transform));
+      SingleOutputStreamOperator<WindowedValue<RawUnionValue>> 
intermDataStream =
+          
inputDataStream.flatMap(doFnWrapper).returns(intermWindowedValueCoder);
+
+      for (Map.Entry<TupleTag<?>, PCollection<?>> output : outputs.entrySet()) 
{
+        final int outputTag = tagsToLabels.get(output.getKey());
+
+        WindowedValue.WindowedValueCoder<?> coderForTag = 
WindowedValue.getFullCoder(
+            output.getValue().getCoder(),
+            windowingStrategy.getWindowFn().windowCoder());
+
+        CoderTypeInformation<WindowedValue<?>> windowedValueCoder =
+            new CoderTypeInformation(coderForTag);
+
+        context.setOutputDataStream(output.getValue(),
+            intermDataStream.filter(new 
FilterFunction<WindowedValue<RawUnionValue>>() {
+              @Override
+              public boolean filter(WindowedValue<RawUnionValue> value) throws 
Exception {
+                return value.getValue().getUnionTag() == outputTag;
+              }
+            }).flatMap(new FlatMapFunction<WindowedValue<RawUnionValue>, 
WindowedValue<?>>() {
+              @Override
+              public void flatMap(WindowedValue<RawUnionValue> value, 
Collector<WindowedValue<?>> collector) throws Exception {
+                collector.collect(WindowedValue.of(
+                    value.getValue().getValue(),
+                    value.getTimestamp(),
+                    value.getWindows(),
+                    value.getPane()));
+              }
+            }).returns(windowedValueCoder));
+      }
+    }
+
+    private Map<TupleTag<?>, Integer> transformTupleTagsToLabels(TupleTag<?> 
mainTag, Set<TupleTag<?>> secondaryTags) {
+      Map<TupleTag<?>, Integer> tagToLabelMap = Maps.newHashMap();
+      tagToLabelMap.put(mainTag, MAIN_TAG_INDEX);
+      int count = MAIN_TAG_INDEX + 1;
+      for (TupleTag<?> tag : secondaryTags) {
+        if (!tagToLabelMap.containsKey(tag)) {
+          tagToLabelMap.put(tag, count++);
+        }
+      }
+      return tagToLabelMap;
+    }
+
+    private UnionCoder getIntermUnionCoder(Collection<PCollection<?>> 
taggedCollections) {
+      List<Coder<?>> outputCoders = Lists.newArrayList();
+      for (PCollection<?> coll : taggedCollections) {
+        outputCoders.add(coll.getCoder());
+      }
+      return UnionCoder.of(outputCoders);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
index 7c4ab93..3586d0c 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/FlinkStreamingTranslationContext.java
@@ -30,58 +30,58 @@ import java.util.Map;
 
 public class FlinkStreamingTranslationContext {
 
-       private final StreamExecutionEnvironment env;
-       private final PipelineOptions options;
+  private final StreamExecutionEnvironment env;
+  private final PipelineOptions options;
 
-       /**
-        * Keeps a mapping between the output value of the PTransform (in 
Dataflow) and the
-        * Flink Operator that produced it, after the translation of the 
correspondinf PTransform
-        * to its Flink equivalent.
-        * */
-       private final Map<PValue, DataStream<?>> dataStreams;
+  /**
+   * Keeps a mapping between the output value of the PTransform (in Dataflow) 
and the
+   * Flink Operator that produced it, after the translation of the 
correspondinf PTransform
+   * to its Flink equivalent.
+   * */
+  private final Map<PValue, DataStream<?>> dataStreams;
 
-       private AppliedPTransform<?, ?, ?> currentTransform;
+  private AppliedPTransform<?, ?, ?> currentTransform;
 
-       public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, 
PipelineOptions options) {
-               this.env = Preconditions.checkNotNull(env);
-               this.options = Preconditions.checkNotNull(options);
-               this.dataStreams = new HashMap<>();
-       }
+  public FlinkStreamingTranslationContext(StreamExecutionEnvironment env, 
PipelineOptions options) {
+    this.env = Preconditions.checkNotNull(env);
+    this.options = Preconditions.checkNotNull(options);
+    this.dataStreams = new HashMap<>();
+  }
 
-       public StreamExecutionEnvironment getExecutionEnvironment() {
-               return env;
-       }
+  public StreamExecutionEnvironment getExecutionEnvironment() {
+    return env;
+  }
 
-       public PipelineOptions getPipelineOptions() {
-               return options;
-       }
+  public PipelineOptions getPipelineOptions() {
+    return options;
+  }
 
-       @SuppressWarnings("unchecked")
-       public <T> DataStream<T> getInputDataStream(PValue value) {
-               return (DataStream<T>) dataStreams.get(value);
-       }
+  @SuppressWarnings("unchecked")
+  public <T> DataStream<T> getInputDataStream(PValue value) {
+    return (DataStream<T>) dataStreams.get(value);
+  }
 
-       public void setOutputDataStream(PValue value, DataStream<?> set) {
-               if (!dataStreams.containsKey(value)) {
-                       dataStreams.put(value, set);
-               }
-       }
+  public void setOutputDataStream(PValue value, DataStream<?> set) {
+    if (!dataStreams.containsKey(value)) {
+      dataStreams.put(value, set);
+    }
+  }
 
-       /**
-        * Sets the AppliedPTransform which carries input/output.
-        * @param currentTransform
-        */
-       public void setCurrentTransform(AppliedPTransform<?, ?, ?> 
currentTransform) {
-               this.currentTransform = currentTransform;
-       }
+  /**
+   * Sets the AppliedPTransform which carries input/output.
+   * @param currentTransform
+   */
+  public void setCurrentTransform(AppliedPTransform<?, ?, ?> currentTransform) 
{
+    this.currentTransform = currentTransform;
+  }
 
-       @SuppressWarnings("unchecked")
-       public <I extends PInput> I getInput(PTransform<I, ?> transform) {
-               return (I) currentTransform.getInput();
-       }
+  @SuppressWarnings("unchecked")
+  public <I extends PInput> I getInput(PTransform<I, ?> transform) {
+    return (I) currentTransform.getInput();
+  }
 
-       @SuppressWarnings("unchecked")
-       public <O extends POutput> O getOutput(PTransform<?, O> transform) {
-               return (O) currentTransform.getOutput();
-       }
+  @SuppressWarnings("unchecked")
+  public <O extends POutput> O getOutput(PTransform<?, O> transform) {
+    return (O) currentTransform.getOutput();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
index 4c7fefd..5897473 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCoGroupKeyedListAggregator.java
@@ -29,30 +29,30 @@ import java.util.List;
 
 public class FlinkCoGroupKeyedListAggregator<K,V1,V2> implements 
CoGroupFunction<KV<K,V1>, KV<K,V2>, KV<K, CoGbkResult>>{
 
-       private CoGbkResultSchema schema;
-       private TupleTag<?> tupleTag1;
-       private TupleTag<?> tupleTag2;
+  private CoGbkResultSchema schema;
+  private TupleTag<?> tupleTag1;
+  private TupleTag<?> tupleTag2;
 
-       public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, 
TupleTag<?> tupleTag1, TupleTag<?> tupleTag2) {
-               this.schema = schema;
-               this.tupleTag1 = tupleTag1;
-               this.tupleTag2 = tupleTag2;
-       }
+  public FlinkCoGroupKeyedListAggregator(CoGbkResultSchema schema, TupleTag<?> 
tupleTag1, TupleTag<?> tupleTag2) {
+    this.schema = schema;
+    this.tupleTag1 = tupleTag1;
+    this.tupleTag2 = tupleTag2;
+  }
 
-       @Override
-       public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> 
second, Collector<KV<K, CoGbkResult>> out) throws Exception {
-               K k = null;
-               List<RawUnionValue> result = new ArrayList<>();
-               int index1 = schema.getIndex(tupleTag1);
-               for (KV<K,?> entry : first) {
-                       k = entry.getKey();
-                       result.add(new RawUnionValue(index1, entry.getValue()));
-               }
-               int index2 = schema.getIndex(tupleTag2);
-               for (KV<K,?> entry : second) {
-                       k = entry.getKey();
-                       result.add(new RawUnionValue(index2, entry.getValue()));
-               }
-               out.collect(KV.of(k, new CoGbkResult(schema, result)));
-       }
+  @Override
+  public void coGroup(Iterable<KV<K,V1>> first, Iterable<KV<K,V2>> second, 
Collector<KV<K, CoGbkResult>> out) throws Exception {
+    K k = null;
+    List<RawUnionValue> result = new ArrayList<>();
+    int index1 = schema.getIndex(tupleTag1);
+    for (KV<K,?> entry : first) {
+      k = entry.getKey();
+      result.add(new RawUnionValue(index1, entry.getValue()));
+    }
+    int index2 = schema.getIndex(tupleTag2);
+    for (KV<K,?> entry : second) {
+      k = entry.getKey();
+      result.add(new RawUnionValue(index2, entry.getValue()));
+    }
+    out.collect(KV.of(k, new CoGbkResult(schema, result)));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
index 21ecaf0..03f2b06 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkCreateFunction.java
@@ -32,29 +32,29 @@ import java.util.List;
  */
 public class FlinkCreateFunction<IN, OUT> implements FlatMapFunction<IN, OUT> {
 
-       private final List<byte[]> elements;
-       private final Coder<OUT> coder;
-
-       public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
-               this.elements = elements;
-               this.coder = coder;
-       }
-
-       @Override
-       @SuppressWarnings("unchecked")
-       public void flatMap(IN value, Collector<OUT> out) throws Exception {
-
-               for (byte[] element : elements) {
-                       ByteArrayInputStream bai = new 
ByteArrayInputStream(element);
-                       OUT outValue = coder.decode(bai, Coder.Context.OUTER);
-                       if (outValue == null) {
-                               // TODO Flink doesn't allow null values in 
records
-                               out.collect((OUT) 
VoidCoderTypeSerializer.VoidValue.INSTANCE);
-                       } else {
-                               out.collect(outValue);
-                       }
-               }
-
-               out.close();
-       }
+  private final List<byte[]> elements;
+  private final Coder<OUT> coder;
+
+  public FlinkCreateFunction(List<byte[]> elements, Coder<OUT> coder) {
+    this.elements = elements;
+    this.coder = coder;
+  }
+
+  @Override
+  @SuppressWarnings("unchecked")
+  public void flatMap(IN value, Collector<OUT> out) throws Exception {
+
+    for (byte[] element : elements) {
+      ByteArrayInputStream bai = new ByteArrayInputStream(element);
+      OUT outValue = coder.decode(bai, Coder.Context.OUTER);
+      if (outValue == null) {
+        // TODO Flink doesn't allow null values in records
+        out.collect((OUT) VoidCoderTypeSerializer.VoidValue.INSTANCE);
+      } else {
+        out.collect(outValue);
+      }
+    }
+
+    out.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
index 9c57d4e..53ff1cf 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkDoFnFunction.java
@@ -49,154 +49,154 @@ import java.util.List;
  */
 public class FlinkDoFnFunction<IN, OUT> extends RichMapPartitionFunction<IN, 
OUT> {
 
-       private final DoFn<IN, OUT> doFn;
-       private transient PipelineOptions options;
-
-       public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) {
-               this.doFn = doFn;
-               this.options = options;
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               ObjectMapper mapper = new ObjectMapper();
-               mapper.writeValue(out, options);
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               ObjectMapper mapper = new ObjectMapper();
-               options = mapper.readValue(in, PipelineOptions.class);
-       }
-
-       @Override
-       public void mapPartition(Iterable<IN> values, Collector<OUT> out) 
throws Exception {
-               ProcessContext context = new ProcessContext(doFn, out);
-               this.doFn.startBundle(context);
-               for (IN value : values) {
-                       context.inValue = value;
-                       doFn.processElement(context);
-               }
-               this.doFn.finishBundle(context);
-       }
-       
-       private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
-
-               IN inValue;
-               Collector<OUT> outCollector;
-
-               public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> 
outCollector) {
-                       fn.super();
-                       super.setupDelegateAggregators();
-                       this.outCollector = outCollector;
-               }
-
-               @Override
-               public IN element() {
-                       return this.inValue;
-               }
-
-
-               @Override
-               public Instant timestamp() {
-                       return Instant.now();
-               }
-
-               @Override
-               public BoundedWindow window() {
-                       return GlobalWindow.INSTANCE;
-               }
-
-               @Override
-               public PaneInfo pane() {
-                       return PaneInfo.NO_FIRING;
-               }
-
-               @Override
-               public WindowingInternals<IN, OUT> windowingInternals() {
-                       return new WindowingInternals<IN, OUT>() {
-                               @Override
-                               public StateInternals stateInternals() {
-                                       return null;
-                               }
-
-                               @Override
-                               public void outputWindowedValue(OUT output, 
Instant timestamp, Collection<? extends BoundedWindow> windows, PaneInfo pane) {
-
-                               }
-
-                               @Override
-                               public TimerInternals timerInternals() {
-                                       return null;
-                               }
-
-                               @Override
-                               public Collection<? extends BoundedWindow> 
windows() {
-                                       return 
ImmutableList.of(GlobalWindow.INSTANCE);
-                               }
-
-                               @Override
-                               public PaneInfo pane() {
-                                       return PaneInfo.NO_FIRING;
-                               }
-
-                               @Override
-                               public <T> void 
writePCollectionViewData(TupleTag<?> tag, Iterable<WindowedValue<T>> data, 
Coder<T> elemCoder) throws IOException {
-                               }
-
-                               @Override
-                               public <T> T sideInput(PCollectionView<T> view, 
BoundedWindow mainInputWindow) {
-                                       throw new RuntimeException("sideInput() 
not implemented.");
-                               }
-                       };
-               }
-
-               @Override
-               public PipelineOptions getPipelineOptions() {
-                       return options;
-               }
-
-               @Override
-               public <T> T sideInput(PCollectionView<T> view) {
-                       List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId());
-                       List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
-                       for (T input : sideInput) {
-                               windowedValueList.add(WindowedValue.of(input, 
Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane()));
-                       }
-                       return view.fromIterableInternal(windowedValueList);
-               }
-
-               @Override
-               public void output(OUT output) {
-                       outCollector.collect(output);
-               }
-
-               @Override
-               public void outputWithTimestamp(OUT output, Instant timestamp) {
-                       // not FLink's way, just output normally
-                       output(output);
-               }
-
-               @Override
-               public <T> void sideOutput(TupleTag<T> tag, T output) {
-                       // ignore the side output, this can happen when a user 
does not register
-                       // side outputs but then outputs using a freshly 
created TupleTag.
-               }
-
-               @Override
-               public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T 
output, Instant timestamp) {
-                       sideOutput(tag, output);
-               }
-
-               @Override
-               protected <AggInputT, AggOutputT> Aggregator<AggInputT, 
AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, 
?, AggOutputT> combiner) {
-                       SerializableFnAggregatorWrapper<AggInputT, AggOutputT> 
wrapper = new SerializableFnAggregatorWrapper<>(combiner);
-                       getRuntimeContext().addAccumulator(name, wrapper);
-                       return wrapper;
-               }
-
-
-       }
+  private final DoFn<IN, OUT> doFn;
+  private transient PipelineOptions options;
+
+  public FlinkDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions options) {
+    this.doFn = doFn;
+    this.options = options;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+  }
+
+  @Override
+  public void mapPartition(Iterable<IN> values, Collector<OUT> out) throws 
Exception {
+    ProcessContext context = new ProcessContext(doFn, out);
+    this.doFn.startBundle(context);
+    for (IN value : values) {
+      context.inValue = value;
+      doFn.processElement(context);
+    }
+    this.doFn.finishBundle(context);
+  }
+  
+  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
+
+    IN inValue;
+    Collector<OUT> outCollector;
+
+    public ProcessContext(DoFn<IN, OUT> fn, Collector<OUT> outCollector) {
+      fn.super();
+      super.setupDelegateAggregators();
+      this.outCollector = outCollector;
+    }
+
+    @Override
+    public IN element() {
+      return this.inValue;
+    }
+
+
+    @Override
+    public Instant timestamp() {
+      return Instant.now();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return PaneInfo.NO_FIRING;
+    }
+
+    @Override
+    public WindowingInternals<IN, OUT> windowingInternals() {
+      return new WindowingInternals<IN, OUT>() {
+        @Override
+        public StateInternals stateInternals() {
+          return null;
+        }
+
+        @Override
+        public void outputWindowedValue(OUT output, Instant timestamp, 
Collection<? extends BoundedWindow> windows, PaneInfo pane) {
+
+        }
+
+        @Override
+        public TimerInternals timerInternals() {
+          return null;
+        }
+
+        @Override
+        public Collection<? extends BoundedWindow> windows() {
+          return ImmutableList.of(GlobalWindow.INSTANCE);
+        }
+
+        @Override
+        public PaneInfo pane() {
+          return PaneInfo.NO_FIRING;
+        }
+
+        @Override
+        public <T> void writePCollectionViewData(TupleTag<?> tag, 
Iterable<WindowedValue<T>> data, Coder<T> elemCoder) throws IOException {
+        }
+
+        @Override
+        public <T> T sideInput(PCollectionView<T> view, BoundedWindow 
mainInputWindow) {
+          throw new RuntimeException("sideInput() not implemented.");
+        }
+      };
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal().getId());
+      List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
+      for (T input : sideInput) {
+        windowedValueList.add(WindowedValue.of(input, Instant.now(), 
ImmutableList.of(GlobalWindow.INSTANCE), pane()));
+      }
+      return view.fromIterableInternal(windowedValueList);
+    }
+
+    @Override
+    public void output(OUT output) {
+      outCollector.collect(output);
+    }
+
+    @Override
+    public void outputWithTimestamp(OUT output, Instant timestamp) {
+      // not FLink's way, just output normally
+      output(output);
+    }
+
+    @Override
+    public <T> void sideOutput(TupleTag<T> tag, T output) {
+      // ignore the side output, this can happen when a user does not register
+      // side outputs but then outputs using a freshly created TupleTag.
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new 
SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, wrapper);
+      return wrapper;
+    }
+
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
index 5d3702a..0116972 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkKeyedListAggregationFunction.java
@@ -29,47 +29,47 @@ import java.util.Iterator;
  */
 public class FlinkKeyedListAggregationFunction<K,V> implements 
GroupReduceFunction<KV<K, V>, KV<K, Iterable<V>>> {
 
-       @Override
-       public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, 
Iterable<V>>> out) throws Exception {
-               Iterator<KV<K, V>> it = values.iterator();
-               KV<K, V> first = it.next();
-               Iterable<V> passThrough = new PassThroughIterable<>(first, it);
-               out.collect(KV.of(first.getKey(), passThrough));
-       }
+  @Override
+  public void reduce(Iterable<KV<K, V>> values, Collector<KV<K, Iterable<V>>> 
out) throws Exception {
+    Iterator<KV<K, V>> it = values.iterator();
+    KV<K, V> first = it.next();
+    Iterable<V> passThrough = new PassThroughIterable<>(first, it);
+    out.collect(KV.of(first.getKey(), passThrough));
+  }
 
-       private static class PassThroughIterable<K, V> implements Iterable<V>, 
Iterator<V>  {
-               private KV<K, V> first;
-               private Iterator<KV<K, V>> iterator;
+  private static class PassThroughIterable<K, V> implements Iterable<V>, 
Iterator<V>  {
+    private KV<K, V> first;
+    private Iterator<KV<K, V>> iterator;
 
-               public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> 
iterator) {
-                       this.first = first;
-                       this.iterator = iterator;
-               }
+    public PassThroughIterable(KV<K, V> first, Iterator<KV<K, V>> iterator) {
+      this.first = first;
+      this.iterator = iterator;
+    }
 
-               @Override
-               public Iterator<V> iterator() {
-                       return this;
-               }
+    @Override
+    public Iterator<V> iterator() {
+      return this;
+    }
 
-               @Override
-               public boolean hasNext() {
-                       return first != null || iterator.hasNext();
-               }
+    @Override
+    public boolean hasNext() {
+      return first != null || iterator.hasNext();
+    }
 
-               @Override
-               public V next() {
-                       if (first != null) {
-                               V result = first.getValue();
-                               first = null;
-                               return result;
-                       } else {
-                               return iterator.next().getValue();
-                       }
-               }
+    @Override
+    public V next() {
+      if (first != null) {
+        V result = first.getValue();
+        first = null;
+        return result;
+      } else {
+        return iterator.next().getValue();
+      }
+    }
 
-               @Override
-               public void remove() {
-                       throw new UnsupportedOperationException("Cannot remove 
elements from input.");
-               }
-       }
+    @Override
+    public void remove() {
+      throw new UnsupportedOperationException("Cannot remove elements from 
input.");
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
index 6187182..9e51638 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputDoFnFunction.java
@@ -51,125 +51,125 @@ import java.util.Map;
  */
 public class FlinkMultiOutputDoFnFunction<IN, OUT> extends 
RichMapPartitionFunction<IN, RawUnionValue> {
 
-       private final DoFn<IN, OUT> doFn;
-       private transient PipelineOptions options;
-       private final Map<TupleTag<?>, Integer> outputMap;
-
-       public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions 
options, Map<TupleTag<?>, Integer> outputMap) {
-               this.doFn = doFn;
-               this.options = options;
-               this.outputMap = outputMap;
-       }
-
-       private void writeObject(ObjectOutputStream out)
-                       throws IOException, ClassNotFoundException {
-               out.defaultWriteObject();
-               ObjectMapper mapper = new ObjectMapper();
-               mapper.writeValue(out, options);
-       }
-
-       private void readObject(ObjectInputStream in)
-                       throws IOException, ClassNotFoundException {
-               in.defaultReadObject();
-               ObjectMapper mapper = new ObjectMapper();
-               options = mapper.readValue(in, PipelineOptions.class);
-
-       }
-
-       @Override
-       public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> 
out) throws Exception {
-               ProcessContext context = new ProcessContext(doFn, out);
-               this.doFn.startBundle(context);
-               for (IN value : values) {
-                       context.inValue = value;
-                       doFn.processElement(context);
-               }
-               this.doFn.finishBundle(context);
-       }
-
-       private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
-
-               IN inValue;
-               Collector<RawUnionValue> outCollector;
-
-               public ProcessContext(DoFn<IN, OUT> fn, 
Collector<RawUnionValue> outCollector) {
-                       fn.super();
-                       this.outCollector = outCollector;
-               }
-
-               @Override
-               public IN element() {
-                       return this.inValue;
-               }
-
-               @Override
-               public Instant timestamp() {
-                       return Instant.now();
-               }
-
-               @Override
-               public BoundedWindow window() {
-                       return GlobalWindow.INSTANCE;
-               }
-
-               @Override
-               public PaneInfo pane() {
-                       return PaneInfo.NO_FIRING;
-               }
-
-               @Override
-               public WindowingInternals<IN, OUT> windowingInternals() {
-                       return null;
-               }
-
-               @Override
-               public PipelineOptions getPipelineOptions() {
-                       return options;
-               }
-
-               @Override
-               public <T> T sideInput(PCollectionView<T> view) {
-                       List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal()
-                                       .getId());
-                       List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
-                       for (T input : sideInput) {
-                               windowedValueList.add(WindowedValue.of(input, 
Instant.now(), ImmutableList.of(GlobalWindow.INSTANCE), pane()));
-                       }
-                       return view.fromIterableInternal(windowedValueList);
-               }
-
-               @Override
-               public void output(OUT value) {
-                       // assume that index 0 is the default output
-                       outCollector.collect(new RawUnionValue(0, value));
-               }
-
-               @Override
-               public void outputWithTimestamp(OUT output, Instant timestamp) {
-                       // not FLink's way, just output normally
-                       output(output);
-               }
-
-               @Override
-               @SuppressWarnings("unchecked")
-               public <T> void sideOutput(TupleTag<T> tag, T value) {
-                       Integer index = outputMap.get(tag);
-                       if (index != null) {
-                               outCollector.collect(new RawUnionValue(index, 
value));
-                       }
-               }
-
-               @Override
-               public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T 
output, Instant timestamp) {
-                       sideOutput(tag, output);
-               }
-
-               @Override
-               protected <AggInputT, AggOutputT> Aggregator<AggInputT, 
AggOutputT> createAggregatorInternal(String name, Combine.CombineFn<AggInputT, 
?, AggOutputT> combiner) {
-                       SerializableFnAggregatorWrapper<AggInputT, AggOutputT> 
wrapper = new SerializableFnAggregatorWrapper<>(combiner);
-                       getRuntimeContext().addAccumulator(name, wrapper);
-                       return null;
-               }
-
-       }
+  private final DoFn<IN, OUT> doFn;
+  private transient PipelineOptions options;
+  private final Map<TupleTag<?>, Integer> outputMap;
+
+  public FlinkMultiOutputDoFnFunction(DoFn<IN, OUT> doFn, PipelineOptions 
options, Map<TupleTag<?>, Integer> outputMap) {
+    this.doFn = doFn;
+    this.options = options;
+    this.outputMap = outputMap;
+  }
+
+  private void writeObject(ObjectOutputStream out)
+      throws IOException, ClassNotFoundException {
+    out.defaultWriteObject();
+    ObjectMapper mapper = new ObjectMapper();
+    mapper.writeValue(out, options);
+  }
+
+  private void readObject(ObjectInputStream in)
+      throws IOException, ClassNotFoundException {
+    in.defaultReadObject();
+    ObjectMapper mapper = new ObjectMapper();
+    options = mapper.readValue(in, PipelineOptions.class);
+
+  }
+
+  @Override
+  public void mapPartition(Iterable<IN> values, Collector<RawUnionValue> out) 
throws Exception {
+    ProcessContext context = new ProcessContext(doFn, out);
+    this.doFn.startBundle(context);
+    for (IN value : values) {
+      context.inValue = value;
+      doFn.processElement(context);
+    }
+    this.doFn.finishBundle(context);
+  }
+
+  private class ProcessContext extends DoFn<IN, OUT>.ProcessContext {
+
+    IN inValue;
+    Collector<RawUnionValue> outCollector;
+
+    public ProcessContext(DoFn<IN, OUT> fn, Collector<RawUnionValue> 
outCollector) {
+      fn.super();
+      this.outCollector = outCollector;
+    }
+
+    @Override
+    public IN element() {
+      return this.inValue;
+    }
+
+    @Override
+    public Instant timestamp() {
+      return Instant.now();
+    }
+
+    @Override
+    public BoundedWindow window() {
+      return GlobalWindow.INSTANCE;
+    }
+
+    @Override
+    public PaneInfo pane() {
+      return PaneInfo.NO_FIRING;
+    }
+
+    @Override
+    public WindowingInternals<IN, OUT> windowingInternals() {
+      return null;
+    }
+
+    @Override
+    public PipelineOptions getPipelineOptions() {
+      return options;
+    }
+
+    @Override
+    public <T> T sideInput(PCollectionView<T> view) {
+      List<T> sideInput = 
getRuntimeContext().getBroadcastVariable(view.getTagInternal()
+          .getId());
+      List<WindowedValue<?>> windowedValueList = new 
ArrayList<>(sideInput.size());
+      for (T input : sideInput) {
+        windowedValueList.add(WindowedValue.of(input, Instant.now(), 
ImmutableList.of(GlobalWindow.INSTANCE), pane()));
+      }
+      return view.fromIterableInternal(windowedValueList);
+    }
+
+    @Override
+    public void output(OUT value) {
+      // assume that index 0 is the default output
+      outCollector.collect(new RawUnionValue(0, value));
+    }
+
+    @Override
+    public void outputWithTimestamp(OUT output, Instant timestamp) {
+      // not FLink's way, just output normally
+      output(output);
+    }
+
+    @Override
+    @SuppressWarnings("unchecked")
+    public <T> void sideOutput(TupleTag<T> tag, T value) {
+      Integer index = outputMap.get(tag);
+      if (index != null) {
+        outCollector.collect(new RawUnionValue(index, value));
+      }
+    }
+
+    @Override
+    public <T> void sideOutputWithTimestamp(TupleTag<T> tag, T output, Instant 
timestamp) {
+      sideOutput(tag, output);
+    }
+
+    @Override
+    protected <AggInputT, AggOutputT> Aggregator<AggInputT, AggOutputT> 
createAggregatorInternal(String name, Combine.CombineFn<AggInputT, ?, 
AggOutputT> combiner) {
+      SerializableFnAggregatorWrapper<AggInputT, AggOutputT> wrapper = new 
SerializableFnAggregatorWrapper<>(combiner);
+      getRuntimeContext().addAccumulator(name, wrapper);
+      return null;
+    }
+
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
index 6792b23..e883d42 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkMultiOutputPruningFunction.java
@@ -25,17 +25,17 @@ import org.apache.flink.util.Collector;
  */
 public class FlinkMultiOutputPruningFunction<T> implements 
FlatMapFunction<RawUnionValue, T> {
 
-       private final int outputTag;
+  private final int outputTag;
 
-       public FlinkMultiOutputPruningFunction(int outputTag) {
-               this.outputTag = outputTag;
-       }
+  public FlinkMultiOutputPruningFunction(int outputTag) {
+    this.outputTag = outputTag;
+  }
 
-       @Override
-       @SuppressWarnings("unchecked")
-       public void flatMap(RawUnionValue rawUnionValue, Collector<T> 
collector) throws Exception {
-               if (rawUnionValue.getUnionTag() == outputTag) {
-                       collector.collect((T) rawUnionValue.getValue());
-               }
-       }
+  @Override
+  @SuppressWarnings("unchecked")
+  public void flatMap(RawUnionValue rawUnionValue, Collector<T> collector) 
throws Exception {
+    if (rawUnionValue.getUnionTag() == outputTag) {
+      collector.collect((T) rawUnionValue.getValue());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
index ef47b72..1ff9aff 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkPartialReduceFunction.java
@@ -32,29 +32,29 @@ import java.util.Iterator;
  */
 public class FlinkPartialReduceFunction<K, VI, VA> implements 
GroupCombineFunction<KV<K, VI>, KV<K, VA>> {
 
-       private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn;
-
-       public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?>
-                                                         keyedCombineFn) {
-               this.keyedCombineFn = keyedCombineFn;
-       }
-
-       @Override
-       public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> 
out) throws Exception {
-
-               final Iterator<KV<K, VI>> iterator = elements.iterator();
-               // create accumulator using the first elements key
-               KV<K, VI> first = iterator.next();
-               K key = first.getKey();
-               VI value = first.getValue();
-               VA accumulator = keyedCombineFn.createAccumulator(key);
-               accumulator = keyedCombineFn.addInput(key, accumulator, value);
-
-               while(iterator.hasNext()) {
-                       value = iterator.next().getValue();
-                       accumulator = keyedCombineFn.addInput(key, accumulator, 
value);
-               }
-
-               out.collect(KV.of(key, accumulator));
-       }
+  private final Combine.KeyedCombineFn<K, VI, VA, ?> keyedCombineFn;
+
+  public FlinkPartialReduceFunction(Combine.KeyedCombineFn<K, VI, VA, ?>
+                                        keyedCombineFn) {
+    this.keyedCombineFn = keyedCombineFn;
+  }
+
+  @Override
+  public void combine(Iterable<KV<K, VI>> elements, Collector<KV<K, VA>> out) 
throws Exception {
+
+    final Iterator<KV<K, VI>> iterator = elements.iterator();
+    // create accumulator using the first elements key
+    KV<K, VI> first = iterator.next();
+    K key = first.getKey();
+    VI value = first.getValue();
+    VA accumulator = keyedCombineFn.createAccumulator(key);
+    accumulator = keyedCombineFn.addInput(key, accumulator, value);
+
+    while(iterator.hasNext()) {
+      value = iterator.next().getValue();
+      accumulator = keyedCombineFn.addInput(key, accumulator, value);
+    }
+
+    out.collect(KV.of(key, accumulator));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
index cd0b38c..94676a2 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/functions/FlinkReduceFunction.java
@@ -33,25 +33,25 @@ import java.util.Iterator;
  */
 public class FlinkReduceFunction<K, VA, VO> implements 
GroupReduceFunction<KV<K, VA>, KV<K, VO>> {
 
-       private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn;
+  private final Combine.KeyedCombineFn<K, ?, VA, VO> keyedCombineFn;
 
-       public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> 
keyedCombineFn) {
-               this.keyedCombineFn = keyedCombineFn;
-       }
+  public FlinkReduceFunction(Combine.KeyedCombineFn<K, ?, VA, VO> 
keyedCombineFn) {
+    this.keyedCombineFn = keyedCombineFn;
+  }
 
-       @Override
-       public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> 
out) throws Exception {
-               Iterator<KV<K, VA>> it = values.iterator();
+  @Override
+  public void reduce(Iterable<KV<K, VA>> values, Collector<KV<K, VO>> out) 
throws Exception {
+    Iterator<KV<K, VA>> it = values.iterator();
 
-               KV<K, VA> current = it.next();
-               K k = current.getKey();
-               VA accumulator = current.getValue();
+    KV<K, VA> current = it.next();
+    K k = current.getKey();
+    VA accumulator = current.getValue();
 
-               while (it.hasNext()) {
-                       current = it.next();
-                       keyedCombineFn.mergeAccumulators(k, 
ImmutableList.of(accumulator, current.getValue()) );
-               }
+    while (it.hasNext()) {
+      current = it.next();
+      keyedCombineFn.mergeAccumulators(k, ImmutableList.of(accumulator, 
current.getValue()) );
+    }
 
-               out.collect(KV.of(k, keyedCombineFn.extractOutput(k, 
accumulator)));
-       }
+    out.collect(KV.of(k, keyedCombineFn.extractOutput(k, accumulator)));
+  }
 }

Reply via email to