http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
index 1525c80..9f602fd 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateCheckpointWriter.java
@@ -25,103 +25,103 @@ import java.util.concurrent.TimeUnit;
 
 public class StateCheckpointWriter {
 
-       private final AbstractStateBackend.CheckpointStateOutputView output;
-
-       public static StateCheckpointWriter 
create(AbstractStateBackend.CheckpointStateOutputView output) {
-               return new StateCheckpointWriter(output);
-       }
-
-       private 
StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView output) {
-               this.output = output;
-       }
-
-       /////////                       Creating the serialized versions of the 
different types of state held by dataflow                       ///////
-
-       public StateCheckpointWriter addValueBuilder() throws IOException {
-               validate();
-               StateType.serialize(StateType.VALUE, this);
-               return this;
-       }
-
-       public StateCheckpointWriter addWatermarkHoldsBuilder() throws 
IOException {
-               validate();
-               StateType.serialize(StateType.WATERMARK, this);
-               return this;
-       }
-
-       public StateCheckpointWriter addListUpdatesBuilder() throws IOException 
{
-               validate();
-               StateType.serialize(StateType.LIST, this);
-               return this;
-       }
-
-       public StateCheckpointWriter addAccumulatorBuilder() throws IOException 
{
-               validate();
-               StateType.serialize(StateType.ACCUMULATOR, this);
-               return this;
-       }
-
-       /////////                       Setting the tag for a given state 
element                       ///////
-
-       public StateCheckpointWriter setTag(ByteString stateKey) throws 
IOException {
-               return writeData(stateKey.toByteArray());
-       }
-
-       public StateCheckpointWriter setTag(String stateKey) throws IOException 
{
-               output.writeUTF(stateKey);
-               return this;
-       }
-
-
-       public <K> StateCheckpointWriter serializeKey(K key, 
CoderTypeSerializer<K> keySerializer) throws IOException {
-               return serializeObject(key, keySerializer);
-       }
-
-       public <T> StateCheckpointWriter serializeObject(T object, 
CoderTypeSerializer<T> objectSerializer) throws IOException {
-               objectSerializer.serialize(object, output);
-               return this;
-       }
-
-       /////////                       Write the actual serialized data        
                //////////
-
-       public StateCheckpointWriter setData(ByteString data) throws 
IOException {
-               return writeData(data.toByteArray());
-       }
-
-       public StateCheckpointWriter setData(byte[] data) throws IOException {
-               return writeData(data);
-       }
-
-       public StateCheckpointWriter setTimestamp(Instant timestamp) throws 
IOException {
-               validate();
-               
output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
-               return this;
-       }
-
-       public StateCheckpointWriter writeInt(int number) throws IOException {
-               validate();
-               output.writeInt(number);
-               return this;
-       }
-
-       public StateCheckpointWriter writeByte(byte b) throws IOException {
-               validate();
-               output.writeByte(b);
-               return this;
-       }
-
-       /////////                       Helper Methods                  ///////
-
-       private StateCheckpointWriter writeData(byte[] data) throws IOException 
{
-               validate();
-               output.writeInt(data.length);
-               output.write(data);
-               return this;
-       }
-
-       private void validate() {
-               if (this.output == null) {
-                       throw new RuntimeException("StateBackend not 
initialized yet.");
-               }
-       }
+  private final AbstractStateBackend.CheckpointStateOutputView output;
+
+  public static StateCheckpointWriter 
create(AbstractStateBackend.CheckpointStateOutputView output) {
+    return new StateCheckpointWriter(output);
+  }
+
+  private StateCheckpointWriter(AbstractStateBackend.CheckpointStateOutputView 
output) {
+    this.output = output;
+  }
+
+  /////////      Creating the serialized versions of the different types of 
state held by dataflow      ///////
+
+  public StateCheckpointWriter addValueBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.VALUE, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addWatermarkHoldsBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.WATERMARK, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addListUpdatesBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.LIST, this);
+    return this;
+  }
+
+  public StateCheckpointWriter addAccumulatorBuilder() throws IOException {
+    validate();
+    StateType.serialize(StateType.ACCUMULATOR, this);
+    return this;
+  }
+
+  /////////      Setting the tag for a given state element      ///////
+
+  public StateCheckpointWriter setTag(ByteString stateKey) throws IOException {
+    return writeData(stateKey.toByteArray());
+  }
+
+  public StateCheckpointWriter setTag(String stateKey) throws IOException {
+    output.writeUTF(stateKey);
+    return this;
+  }
+
+
+  public <K> StateCheckpointWriter serializeKey(K key, CoderTypeSerializer<K> 
keySerializer) throws IOException {
+    return serializeObject(key, keySerializer);
+  }
+
+  public <T> StateCheckpointWriter serializeObject(T object, 
CoderTypeSerializer<T> objectSerializer) throws IOException {
+    objectSerializer.serialize(object, output);
+    return this;
+  }
+
+  /////////      Write the actual serialized data      //////////
+
+  public StateCheckpointWriter setData(ByteString data) throws IOException {
+    return writeData(data.toByteArray());
+  }
+
+  public StateCheckpointWriter setData(byte[] data) throws IOException {
+    return writeData(data);
+  }
+
+  public StateCheckpointWriter setTimestamp(Instant timestamp) throws 
IOException {
+    validate();
+    output.writeLong(TimeUnit.MILLISECONDS.toMicros(timestamp.getMillis()));
+    return this;
+  }
+
+  public StateCheckpointWriter writeInt(int number) throws IOException {
+    validate();
+    output.writeInt(number);
+    return this;
+  }
+
+  public StateCheckpointWriter writeByte(byte b) throws IOException {
+    validate();
+    output.writeByte(b);
+    return this;
+  }
+
+  /////////      Helper Methods      ///////
+
+  private StateCheckpointWriter writeData(byte[] data) throws IOException {
+    validate();
+    output.writeInt(data.length);
+    output.write(data);
+    return this;
+  }
+
+  private void validate() {
+    if (this.output == null) {
+      throw new RuntimeException("StateBackend not initialized yet.");
+    }
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
index aa049ef..9e2c9f8 100644
--- 
a/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
+++ 
b/runners/flink/src/main/java/com/dataartisans/flink/dataflow/translation/wrappers/streaming/state/StateType.java
@@ -23,49 +23,49 @@ import java.io.IOException;
  * */
 public enum StateType {
 
-       VALUE(0),
+  VALUE(0),
 
-       WATERMARK(1),
+  WATERMARK(1),
 
-       LIST(2),
+  LIST(2),
 
-       ACCUMULATOR(3);
+  ACCUMULATOR(3);
 
-       private final int numVal;
+  private final int numVal;
 
-       StateType(int value) {
-               this.numVal = value;
-       }
+  StateType(int value) {
+    this.numVal = value;
+  }
 
-       public static void serialize(StateType type, StateCheckpointWriter 
output) throws IOException {
-               if (output == null) {
-                       throw new IllegalArgumentException("Cannot write to a 
null output.");
-               }
+  public static void serialize(StateType type, StateCheckpointWriter output) 
throws IOException {
+    if (output == null) {
+      throw new IllegalArgumentException("Cannot write to a null output.");
+    }
 
-               if(type.numVal < 0 || type.numVal > 3) {
-                       throw new RuntimeException("Unknown State Type " + type 
+ ".");
-               }
+    if(type.numVal < 0 || type.numVal > 3) {
+      throw new RuntimeException("Unknown State Type " + type + ".");
+    }
 
-               output.writeByte((byte) type.numVal);
-       }
+    output.writeByte((byte) type.numVal);
+  }
 
-       public static StateType deserialize(StateCheckpointReader input) throws 
IOException {
-               if (input == null) {
-                       throw new IllegalArgumentException("Cannot read from a 
null input.");
-               }
+  public static StateType deserialize(StateCheckpointReader input) throws 
IOException {
+    if (input == null) {
+      throw new IllegalArgumentException("Cannot read from a null input.");
+    }
 
-               int typeInt = (int) input.getByte();
-               if(typeInt < 0 || typeInt > 3) {
-                       throw new RuntimeException("Unknown State Type " + 
typeInt + ".");
-               }
+    int typeInt = (int) input.getByte();
+    if(typeInt < 0 || typeInt > 3) {
+      throw new RuntimeException("Unknown State Type " + typeInt + ".");
+    }
 
-               StateType resultType = null;
-               for(StateType st: values()) {
-                       if(st.numVal == typeInt) {
-                               resultType = st;
-                               break;
-                       }
-               }
-               return resultType;
-       }
+    StateType resultType = null;
+    for(StateType st: values()) {
+      if(st.numVal == typeInt) {
+        resultType = st;
+        break;
+      }
+    }
+    return resultType;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
index ce53d44..3272975 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/AvroITCase.java
@@ -30,70 +30,70 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class AvroITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
-       protected String tmpPath;
-
-       public AvroITCase(){
-       }
-
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "Joe red 3",
-                       "Mary blue 4",
-                       "Mark green 1",
-                       "Julia purple 5"
-       };
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-               tmpPath = getTempDirPath("tmp");
-
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               runProgram(tmpPath, resultPath);
-       }
-
-       private static void runProgram(String tmpPath, String resultPath) {
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               p
-                       .apply(Create.of(
-                                       new User("Joe", 3, "red"),
-                                       new User("Mary", 4, "blue"),
-                                       new User("Mark", 1, "green"),
-                                       new User("Julia", 5, "purple"))
-                               .withCoder(AvroCoder.of(User.class)))
-
-                       .apply(AvroIO.Write.to(tmpPath)
-                               .withSchema(User.class));
-
-               p.run();
-
-               p = FlinkTestPipeline.createForBatch();
-
-               p
-                       
.apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
-
-                               .apply(ParDo.of(new DoFn<User, String>() {
-                                       @Override
-                                       public void 
processElement(ProcessContext c) throws Exception {
-                                               User u = c.element();
-                                               String result = u.getName() + " 
" + u.getFavoriteColor() + " " + u.getFavoriteNumber();
-                                               c.output(result);
-                                       }
-                               }))
-
-                       .apply(TextIO.Write.to(resultPath));
-
-               p.run();
-       }
+  protected String resultPath;
+  protected String tmpPath;
+
+  public AvroITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3",
+      "Mary blue 4",
+      "Mark green 1",
+      "Julia purple 5"
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+    tmpPath = getTempDirPath("tmp");
+
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(tmpPath, resultPath);
+  }
+
+  private static void runProgram(String tmpPath, String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p
+      .apply(Create.of(
+          new User("Joe", 3, "red"),
+          new User("Mary", 4, "blue"),
+          new User("Mark", 1, "green"),
+          new User("Julia", 5, "purple"))
+        .withCoder(AvroCoder.of(User.class)))
+
+      .apply(AvroIO.Write.to(tmpPath)
+        .withSchema(User.class));
+
+    p.run();
+
+    p = FlinkTestPipeline.createForBatch();
+
+    p
+      
.apply(AvroIO.Read.from(tmpPath).withSchema(User.class).withoutValidation())
+
+        .apply(ParDo.of(new DoFn<User, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            User u = c.element();
+            String result = u.getName() + " " + u.getFavoriteColor() + " " + 
u.getFavoriteNumber();
+            c.output(result);
+          }
+        }))
+
+      .apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
index 928388c..e65e497 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlattenizeITCase.java
@@ -26,47 +26,47 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class FlattenizeITCase extends JavaProgramTestBase {
 
-       private String resultPath;
-       private String resultPath2;
+  private String resultPath;
+  private String resultPath2;
 
-       private static final String[] words = {"hello", "this", "is", "a", 
"DataSet!"};
-       private static final String[] words2 = {"hello", "this", "is", 
"another", "DataSet!"};
-       private static final String[] words3 = {"hello", "this", "is", "yet", 
"another", "DataSet!"};
+  private static final String[] words = {"hello", "this", "is", "a", 
"DataSet!"};
+  private static final String[] words2 = {"hello", "this", "is", "another", 
"DataSet!"};
+  private static final String[] words3 = {"hello", "this", "is", "yet", 
"another", "DataSet!"};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-               resultPath2 = getTempDirPath("result2");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+    resultPath2 = getTempDirPath("result2");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               String join = Joiner.on('\n').join(words);
-               String join2 = Joiner.on('\n').join(words2);
-               String join3 = Joiner.on('\n').join(words3);
-               compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
-               compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + 
join3, resultPath2);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    String join = Joiner.on('\n').join(words);
+    String join2 = Joiner.on('\n').join(words2);
+    String join3 = Joiner.on('\n').join(words3);
+    compareResultsByLinesInMemory(join + "\n" + join2, resultPath);
+    compareResultsByLinesInMemory(join + "\n" + join2 + "\n" + join3, 
resultPath2);
+  }
 
 
-       @Override
-       protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.createForBatch();
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-               PCollection<String> p1 = p.apply(Create.of(words));
-               PCollection<String> p2 = p.apply(Create.of(words2));
+    PCollection<String> p1 = p.apply(Create.of(words));
+    PCollection<String> p2 = p.apply(Create.of(words2));
 
-               PCollectionList<String> list = PCollectionList.of(p1).and(p2);
+    PCollectionList<String> list = PCollectionList.of(p1).and(p2);
 
-               
list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
+    
list.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath));
 
-               PCollection<String> p3 = p.apply(Create.of(words3));
+    PCollection<String> p3 = p.apply(Create.of(words3));
 
-               PCollectionList<String> list2 = list.and(p3);
+    PCollectionList<String> list2 = list.and(p3);
 
-               
list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
+    
list2.apply(Flatten.<String>pCollections()).apply(TextIO.Write.to(resultPath2));
 
-               p.run();
-       }
+    p.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
index 59c3b69..578e0e1 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/FlinkTestPipeline.java
@@ -26,45 +26,45 @@ import com.google.cloud.dataflow.sdk.runners.PipelineRunner;
  */
 public class FlinkTestPipeline extends Pipeline {
 
-       /**
-        * Creates and returns a new test pipeline for batch execution.
-        *
-        * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
-        * {@link Pipeline#run} to execute the pipeline and check the tests.
-        */
-       public static FlinkTestPipeline createForBatch() {
-               return create(false);
-       }
+  /**
+   * Creates and returns a new test pipeline for batch execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to 
add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   */
+  public static FlinkTestPipeline createForBatch() {
+    return create(false);
+  }
 
-       /**
-        * Creates and returns a new test pipeline for streaming execution.
-        *
-        * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
-        * {@link Pipeline#run} to execute the pipeline and check the tests.
-        *
-        * @return The Test Pipeline
-        */
-       public static FlinkTestPipeline createForStreaming() {
-               return create(true);
-       }
+  /**
+   * Creates and returns a new test pipeline for streaming execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to 
add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @return The Test Pipeline
+   */
+  public static FlinkTestPipeline createForStreaming() {
+    return create(true);
+  }
 
-       /**
-        * Creates and returns a new test pipeline for streaming or batch 
execution.
-        *
-        * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} 
to add tests, then call
-        * {@link Pipeline#run} to execute the pipeline and check the tests.
-        *
-        * @param streaming <code>True</code> for streaming mode, 
<code>False</code> for batch.
-        * @return The Test Pipeline.
-        */
-       private static FlinkTestPipeline create(boolean streaming) {
-               FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest(streaming);
-               return new FlinkTestPipeline(flinkRunner, 
flinkRunner.getPipelineOptions());
-       }
+  /**
+   * Creates and returns a new test pipeline for streaming or batch execution.
+   *
+   * <p> Use {@link com.google.cloud.dataflow.sdk.testing.DataflowAssert} to 
add tests, then call
+   * {@link Pipeline#run} to execute the pipeline and check the tests.
+   *
+   * @param streaming <code>True</code> for streaming mode, <code>False</code> 
for batch.
+   * @return The Test Pipeline.
+   */
+  private static FlinkTestPipeline create(boolean streaming) {
+    FlinkPipelineRunner flinkRunner = 
FlinkPipelineRunner.createForTest(streaming);
+    return new FlinkTestPipeline(flinkRunner, 
flinkRunner.getPipelineOptions());
+  }
 
-       private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> 
runner,
-                                                       PipelineOptions 
options) {
-               super(runner, options);
-       }
+  private FlinkTestPipeline(PipelineRunner<? extends PipelineResult> runner,
+              PipelineOptions options) {
+    super(runner, options);
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
index af0f217..28861ea 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/JoinExamplesITCase.java
@@ -34,66 +34,66 @@ import java.util.List;
  */
 public class JoinExamplesITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
-
-       public JoinExamplesITCase(){
-       }
-
-       private static final TableRow row1 = new TableRow()
-                       .set("ActionGeo_CountryCode", "VM").set("SQLDATE", 
"20141212")
-                       .set("Actor1Name", "BANGKOK").set("SOURCEURL", 
"http://cnn.com";);
-       private static final TableRow row2 = new TableRow()
-                       .set("ActionGeo_CountryCode", "VM").set("SQLDATE", 
"20141212")
-                       .set("Actor1Name", "LAOS").set("SOURCEURL", 
"http://www.chicagotribune.com";);
-       private static final TableRow row3 = new TableRow()
-                       .set("ActionGeo_CountryCode", "BE").set("SQLDATE", 
"20141213")
-                       .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", 
"http://cnn.com";);
-       static final TableRow[] EVENTS = new TableRow[] {
-                       row1, row2, row3
-       };
-       static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
-
-       private static final TableRow cc1 = new TableRow()
-                       .set("FIPSCC", "VM").set("HumanName", "Vietnam");
-       private static final TableRow cc2 = new TableRow()
-                       .set("FIPSCC", "BE").set("HumanName", "Belgium");
-       static final TableRow[] CCS = new TableRow[] {
-                       cc1, cc2
-       };
-       static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
-
-       static final String[] JOINED_EVENTS = new String[] {
-                       "Country code: VM, Country name: Vietnam, Event info: 
Date: 20141212, Actor1: LAOS, "
-                                       + "url: http://www.chicagotribune.com";,
-                       "Country code: VM, Country name: Vietnam, Event info: 
Date: 20141212, Actor1: BANGKOK, "
-                                       + "url: http://cnn.com";,
-                       "Country code: BE, Country name: Belgium, Event info: 
Date: 20141213, Actor1: AFGHANISTAN, "
-                                       + "url: http://cnn.com";
-       };
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
-               PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
-
-               PCollection<String> output = JoinExamples.joinEvents(input1, 
input2);
-
-               output.apply(TextIO.Write.to(resultPath));
-
-               p.run();
-       }
+  protected String resultPath;
+
+  public JoinExamplesITCase(){
+  }
+
+  private static final TableRow row1 = new TableRow()
+      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+      .set("Actor1Name", "BANGKOK").set("SOURCEURL", "http://cnn.com";);
+  private static final TableRow row2 = new TableRow()
+      .set("ActionGeo_CountryCode", "VM").set("SQLDATE", "20141212")
+      .set("Actor1Name", "LAOS").set("SOURCEURL", 
"http://www.chicagotribune.com";);
+  private static final TableRow row3 = new TableRow()
+      .set("ActionGeo_CountryCode", "BE").set("SQLDATE", "20141213")
+      .set("Actor1Name", "AFGHANISTAN").set("SOURCEURL", "http://cnn.com";);
+  static final TableRow[] EVENTS = new TableRow[] {
+      row1, row2, row3
+  };
+  static final List<TableRow> EVENT_ARRAY = Arrays.asList(EVENTS);
+
+  private static final TableRow cc1 = new TableRow()
+      .set("FIPSCC", "VM").set("HumanName", "Vietnam");
+  private static final TableRow cc2 = new TableRow()
+      .set("FIPSCC", "BE").set("HumanName", "Belgium");
+  static final TableRow[] CCS = new TableRow[] {
+      cc1, cc2
+  };
+  static final List<TableRow> CC_ARRAY = Arrays.asList(CCS);
+
+  static final String[] JOINED_EVENTS = new String[] {
+      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, 
Actor1: LAOS, "
+          + "url: http://www.chicagotribune.com";,
+      "Country code: VM, Country name: Vietnam, Event info: Date: 20141212, 
Actor1: BANGKOK, "
+          + "url: http://cnn.com";,
+      "Country code: BE, Country name: Belgium, Event info: Date: 20141213, 
Actor1: AFGHANISTAN, "
+          + "url: http://cnn.com";
+  };
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(JOINED_EVENTS), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<TableRow> input1 = p.apply(Create.of(EVENT_ARRAY));
+    PCollection<TableRow> input2 = p.apply(Create.of(CC_ARRAY));
+
+    PCollection<String> output = JoinExamples.joinEvents(input1, input2);
+
+    output.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
index 35f2eaf..d1652e7 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/MaybeEmptyTestITCase.java
@@ -27,37 +27,37 @@ import java.io.Serializable;
 
 public class MaybeEmptyTestITCase extends JavaProgramTestBase implements 
Serializable {
 
-       protected String resultPath;
-
-       protected final String expected = "test";
-
-       public MaybeEmptyTestITCase() {
-       }
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
-                               .apply(ParDo.of(
-                                               new DoFn<Void, String>() {
-                                                       @Override
-                                                       public void 
processElement(DoFn<Void, String>.ProcessContext c) {
-                                                               
c.output(expected);
-                                                       }
-                                               
})).apply(TextIO.Write.to(resultPath));
-               p.run();
-       }
+  protected String resultPath;
+
+  protected final String expected = "test";
+
+  public MaybeEmptyTestITCase() {
+  }
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of((Void) null)).setCoder(VoidCoder.of())
+        .apply(ParDo.of(
+            new DoFn<Void, String>() {
+              @Override
+              public void processElement(DoFn<Void, String>.ProcessContext c) {
+                c.output(expected);
+              }
+            })).apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
index ccdbbf9..d8087d6 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ParDoMultiOutputITCase.java
@@ -31,68 +31,68 @@ import java.io.Serializable;
 
 public class ParDoMultiOutputITCase extends JavaProgramTestBase implements 
Serializable {
 
-       private String resultPath;
-
-       private static String[] expectedWords = {"MAAA", "MAAFOOO"};
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               PCollection<String> words = p.apply(Create.of("Hello", 
"Whatupmyman", "hey", "SPECIALthere", "MAAA", "MAAFOOO"));
-
-               // Select words whose length is below a cut off,
-               // plus the lengths of words that are above the cut off.
-               // Also select words starting with "MARKER".
-               final int wordLengthCutOff = 3;
-               // Create tags to use for the main and side outputs.
-               final TupleTag<String> wordsBelowCutOffTag = new 
TupleTag<String>(){};
-               final TupleTag<Integer> wordLengthsAboveCutOffTag = new 
TupleTag<Integer>(){};
-               final TupleTag<String> markedWordsTag = new 
TupleTag<String>(){};
-
-               PCollectionTuple results =
-                               words.apply(ParDo
-                                               
.withOutputTags(wordsBelowCutOffTag, TupleTagList.of(wordLengthsAboveCutOffTag)
-                                                               
.and(markedWordsTag))
-                                               .of(new DoFn<String, String>() {
-                                                       final TupleTag<String> 
specialWordsTag = new TupleTag<String>() {
-                                                       };
-
-                                                       public void 
processElement(ProcessContext c) {
-                                                               String word = 
c.element();
-                                                               if 
(word.length() <= wordLengthCutOff) {
-                                                                       
c.output(word);
-                                                               } else {
-                                                                       
c.sideOutput(wordLengthsAboveCutOffTag, word.length());
-                                                               }
-                                                               if 
(word.startsWith("MAA")) {
-                                                                       
c.sideOutput(markedWordsTag, word);
-                                                               }
-
-                                                               if 
(word.startsWith("SPECIAL")) {
-                                                                       
c.sideOutput(specialWordsTag, word);
-                                                               }
-                                                       }
-                                               }));
-
-               // Extract the PCollection results, by tag.
-               PCollection<String> wordsBelowCutOff = 
results.get(wordsBelowCutOffTag);
-               PCollection<Integer> wordLengthsAboveCutOff = results.get
-                               (wordLengthsAboveCutOffTag);
-               PCollection<String> markedWords = results.get(markedWordsTag);
-
-               markedWords.apply(TextIO.Write.to(resultPath));
-
-               p.run();
-       }
+  private String resultPath;
+
+  private static String[] expectedWords = {"MAAA", "MAAFOOO"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on("\n").join(expectedWords), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> words = p.apply(Create.of("Hello", "Whatupmyman", 
"hey", "SPECIALthere", "MAAA", "MAAFOOO"));
+
+    // Select words whose length is below a cut off,
+    // plus the lengths of words that are above the cut off.
+    // Also select words starting with "MARKER".
+    final int wordLengthCutOff = 3;
+    // Create tags to use for the main and side outputs.
+    final TupleTag<String> wordsBelowCutOffTag = new TupleTag<String>(){};
+    final TupleTag<Integer> wordLengthsAboveCutOffTag = new 
TupleTag<Integer>(){};
+    final TupleTag<String> markedWordsTag = new TupleTag<String>(){};
+
+    PCollectionTuple results =
+        words.apply(ParDo
+            .withOutputTags(wordsBelowCutOffTag, 
TupleTagList.of(wordLengthsAboveCutOffTag)
+                .and(markedWordsTag))
+            .of(new DoFn<String, String>() {
+              final TupleTag<String> specialWordsTag = new TupleTag<String>() {
+              };
+
+              public void processElement(ProcessContext c) {
+                String word = c.element();
+                if (word.length() <= wordLengthCutOff) {
+                  c.output(word);
+                } else {
+                  c.sideOutput(wordLengthsAboveCutOffTag, word.length());
+                }
+                if (word.startsWith("MAA")) {
+                  c.sideOutput(markedWordsTag, word);
+                }
+
+                if (word.startsWith("SPECIAL")) {
+                  c.sideOutput(specialWordsTag, word);
+                }
+              }
+            }));
+
+    // Extract the PCollection results, by tag.
+    PCollection<String> wordsBelowCutOff = results.get(wordsBelowCutOffTag);
+    PCollection<Integer> wordLengthsAboveCutOff = results.get
+        (wordLengthsAboveCutOffTag);
+    PCollection<String> markedWords = results.get(markedWordsTag);
+
+    markedWords.apply(TextIO.Write.to(resultPath));
+
+    p.run();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
index 3569a78..5a46359 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/ReadSourceITCase.java
@@ -36,128 +36,128 @@ import java.util.List;
 
 public class ReadSourceITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
-
-       public ReadSourceITCase(){
-       }
-
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "1", "2", "3", "4", "5", "6", "7", "8", "9"};
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               runProgram(resultPath);
-       }
-
-       private static void runProgram(String resultPath) {
-
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               PCollection<String> result = p
-                               .apply(Read.from(new ReadSource(1, 10)))
-                               .apply(ParDo.of(new DoFn<Integer, String>() {
-                                       @Override
-                                       public void 
processElement(ProcessContext c) throws Exception {
-                                               
c.output(c.element().toString());
-                                       }
-                               }));
-
-               result.apply(TextIO.Write.to(resultPath));
-               p.run();
-       }
-
-
-       private static class ReadSource extends BoundedSource<Integer> {
-               final int from;
-               final int to;
-
-               ReadSource(int from, int to) {
-                       this.from = from;
-                       this.to = to;
-               }
-
-               @Override
-               public List<ReadSource> splitIntoBundles(long 
desiredShardSizeBytes, PipelineOptions options)
-                               throws Exception {
-                       List<ReadSource> res = new ArrayList<>();
-                       FlinkPipelineOptions flinkOptions = 
options.as(FlinkPipelineOptions.class);
-                       int numWorkers = flinkOptions.getParallelism();
-                       Preconditions.checkArgument(numWorkers > 0, "Number of 
workers should be larger than 0.");
-
-                       float step = 1.0f * (to - from) / numWorkers;
-                       for (int i = 0; i < numWorkers; ++i) {
-                               res.add(new ReadSource(Math.round(from + i * 
step), Math.round(from + (i + 1) * step)));
-                       }
-                       return res;
-               }
-
-               @Override
-               public long getEstimatedSizeBytes(PipelineOptions options) 
throws Exception {
-                       return 8 * (to - from);
-               }
-
-               @Override
-               public boolean producesSortedKeys(PipelineOptions options) 
throws Exception {
-                       return true;
-               }
-
-               @Override
-               public BoundedReader<Integer> createReader(PipelineOptions 
options) throws IOException {
-                       return new RangeReader(this);
-               }
-
-               @Override
-               public void validate() {}
-
-               @Override
-               public Coder<Integer> getDefaultOutputCoder() {
-                       return BigEndianIntegerCoder.of();
-               }
-
-               private class RangeReader extends BoundedReader<Integer> {
-                       private int current;
-
-                       public RangeReader(ReadSource source) {
-                               this.current = source.from - 1;
-                       }
-
-                       @Override
-                       public boolean start() throws IOException {
-                               return true;
-                       }
-
-                       @Override
-                       public boolean advance() throws IOException {
-                               current++;
-                               return (current < to);
-                       }
-
-                       @Override
-                       public Integer getCurrent() {
-                               return current;
-                       }
-
-                       @Override
-                       public void close() throws IOException {
-                               // Nothing
-                       }
-
-                       @Override
-                       public BoundedSource<Integer> getCurrentSource() {
-                               return ReadSource.this;
-                       }
-               }
-       }
+  protected String resultPath;
+
+  public ReadSourceITCase(){
+  }
+
+  static final String[] EXPECTED_RESULT = new String[] {
+      "1", "2", "3", "4", "5", "6", "7", "8", "9"};
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    PCollection<String> result = p
+        .apply(Read.from(new ReadSource(1, 10)))
+        .apply(ParDo.of(new DoFn<Integer, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            c.output(c.element().toString());
+          }
+        }));
+
+    result.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
+
+
+  private static class ReadSource extends BoundedSource<Integer> {
+    final int from;
+    final int to;
+
+    ReadSource(int from, int to) {
+      this.from = from;
+      this.to = to;
+    }
+
+    @Override
+    public List<ReadSource> splitIntoBundles(long desiredShardSizeBytes, 
PipelineOptions options)
+        throws Exception {
+      List<ReadSource> res = new ArrayList<>();
+      FlinkPipelineOptions flinkOptions = 
options.as(FlinkPipelineOptions.class);
+      int numWorkers = flinkOptions.getParallelism();
+      Preconditions.checkArgument(numWorkers > 0, "Number of workers should be 
larger than 0.");
+
+      float step = 1.0f * (to - from) / numWorkers;
+      for (int i = 0; i < numWorkers; ++i) {
+        res.add(new ReadSource(Math.round(from + i * step), Math.round(from + 
(i + 1) * step)));
+      }
+      return res;
+    }
+
+    @Override
+    public long getEstimatedSizeBytes(PipelineOptions options) throws 
Exception {
+      return 8 * (to - from);
+    }
+
+    @Override
+    public boolean producesSortedKeys(PipelineOptions options) throws 
Exception {
+      return true;
+    }
+
+    @Override
+    public BoundedReader<Integer> createReader(PipelineOptions options) throws 
IOException {
+      return new RangeReader(this);
+    }
+
+    @Override
+    public void validate() {}
+
+    @Override
+    public Coder<Integer> getDefaultOutputCoder() {
+      return BigEndianIntegerCoder.of();
+    }
+
+    private class RangeReader extends BoundedReader<Integer> {
+      private int current;
+
+      public RangeReader(ReadSource source) {
+        this.current = source.from - 1;
+      }
+
+      @Override
+      public boolean start() throws IOException {
+        return true;
+      }
+
+      @Override
+      public boolean advance() throws IOException {
+        current++;
+        return (current < to);
+      }
+
+      @Override
+      public Integer getCurrent() {
+        return current;
+      }
+
+      @Override
+      public void close() throws IOException {
+        // Nothing
+      }
+
+      @Override
+      public BoundedSource<Integer> getCurrentSource() {
+        return ReadSource.this;
+      }
+    }
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
index db794f7..615f194 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesEmptyITCase.java
@@ -30,39 +30,39 @@ import java.util.List;
 
 public class RemoveDuplicatesEmptyITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
+  protected String resultPath;
 
-       public RemoveDuplicatesEmptyITCase(){
-       }
+  public RemoveDuplicatesEmptyITCase(){
+  }
 
-       static final String[] EXPECTED_RESULT = new String[] {};
+  static final String[] EXPECTED_RESULT = new String[] {};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-               List<String> strings = Collections.emptyList();
+    List<String> strings = Collections.emptyList();
 
-               Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-               PCollection<String> input =
-                               p.apply(Create.of(strings))
-                                               .setCoder(StringUtf8Coder.of());
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
 
-               PCollection<String> output =
-                               input.apply(RemoveDuplicates.<String>create());
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
 
-               output.apply(TextIO.Write.to(resultPath));
-               p.run();
-       }
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
index 04e06b8..8c19f2c 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/RemoveDuplicatesITCase.java
@@ -30,40 +30,40 @@ import java.util.List;
 
 public class RemoveDuplicatesITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
+  protected String resultPath;
 
-       public RemoveDuplicatesITCase(){
-       }
+  public RemoveDuplicatesITCase(){
+  }
 
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "k1", "k5", "k2", "k3"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "k1", "k5", "k2", "k3"};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-               List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", 
"k1", "k2", "k3");
+    List<String> strings = Arrays.asList("k1", "k5", "k5", "k2", "k1", "k2", 
"k3");
 
-               Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-               PCollection<String> input =
-                               p.apply(Create.of(strings))
-                                               .setCoder(StringUtf8Coder.of());
+    PCollection<String> input =
+        p.apply(Create.of(strings))
+            .setCoder(StringUtf8Coder.of());
 
-               PCollection<String> output =
-                               input.apply(RemoveDuplicates.<String>create());
+    PCollection<String> output =
+        input.apply(RemoveDuplicates.<String>create());
 
-               output.apply(TextIO.Write.to(resultPath));
-               p.run();
-       }
+    output.apply(TextIO.Write.to(resultPath));
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
index ee8843c..7c3d6f9 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/SideInputITCase.java
@@ -28,40 +28,40 @@ import java.io.Serializable;
 
 public class SideInputITCase extends JavaProgramTestBase implements 
Serializable {
 
-       private static final String expected = "Hello!";
+  private static final String expected = "Hello!";
 
-       protected String resultPath;
+  protected String resultPath;
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
 
-               Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
 
-               final PCollectionView<String> sidesInput = p
-                               .apply(Create.of(expected))
-                               .apply(View.<String>asSingleton());
+    final PCollectionView<String> sidesInput = p
+        .apply(Create.of(expected))
+        .apply(View.<String>asSingleton());
 
-               p.apply(Create.of("bli"))
-                               .apply(ParDo.of(new DoFn<String, String>() {
-                                       @Override
-                                       public void 
processElement(ProcessContext c) throws Exception {
-                                               String s = 
c.sideInput(sidesInput);
-                                               c.output(s);
-                                       }
-                               
}).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
+    p.apply(Create.of("bli"))
+        .apply(ParDo.of(new DoFn<String, String>() {
+          @Override
+          public void processElement(ProcessContext c) throws Exception {
+            String s = c.sideInput(sidesInput);
+            c.output(s);
+          }
+        }).withSideInputs(sidesInput)).apply(TextIO.Write.to(resultPath));
 
-               p.run();
-       }
+    p.run();
+  }
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(expected, resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(expected, resultPath);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
index 07c1294..715d0be 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/TfIdfITCase.java
@@ -32,45 +32,45 @@ import java.net.URI;
 
 public class TfIdfITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
+  protected String resultPath;
 
-       public TfIdfITCase(){
-       }
+  public TfIdfITCase(){
+  }
 
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "a", "m", "n", "b", "c", "d"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "a", "m", "n", "b", "c", "d"};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-               Pipeline pipeline = FlinkTestPipeline.createForBatch();
+    Pipeline pipeline = FlinkTestPipeline.createForBatch();
 
-               pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
+    pipeline.getCoderRegistry().registerCoder(URI.class, 
StringDelegateCoder.of(URI.class));
 
-               PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = 
pipeline
-                               .apply(Create.of(
-                                               KV.of(new URI("x"), "a b c d"),
-                                               KV.of(new URI("y"), "a b c"),
-                                               KV.of(new URI("z"), "a m n")))
-                               .apply(new TFIDF.ComputeTfIdf());
+    PCollection<KV<String, KV<URI, Double>>> wordToUriAndTfIdf = pipeline
+        .apply(Create.of(
+            KV.of(new URI("x"), "a b c d"),
+            KV.of(new URI("y"), "a b c"),
+            KV.of(new URI("z"), "a m n")))
+        .apply(new TFIDF.ComputeTfIdf());
 
-               PCollection<String> words = wordToUriAndTfIdf
-                               .apply(Keys.<String>create())
-                               .apply(RemoveDuplicates.<String>create());
+    PCollection<String> words = wordToUriAndTfIdf
+        .apply(Keys.<String>create())
+        .apply(RemoveDuplicates.<String>create());
 
-               words.apply(TextIO.Write.to(resultPath));
+    words.apply(TextIO.Write.to(resultPath));
 
-               pipeline.run();
-       }
+    pipeline.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
index 9188097..f1a2454 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountITCase.java
@@ -32,43 +32,43 @@ import java.util.List;
 
 public class WordCountITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
+  protected String resultPath;
 
-       public WordCountITCase(){
-       }
+  public WordCountITCase(){
+  }
 
-       static final String[] WORDS_ARRAY = new String[] {
-                       "hi there", "hi", "hi sue bob",
-                       "hi sue", "", "bob hi"};
+  static final String[] WORDS_ARRAY = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
 
-       static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
+  static final List<String> WORDS = Arrays.asList(WORDS_ARRAY);
 
-       static final String[] COUNTS_ARRAY = new String[] {
-                       "hi: 5", "there: 1", "sue: 2", "bob: 2"};
+  static final String[] COUNTS_ARRAY = new String[] {
+      "hi: 5", "there: 1", "sue: 2", "bob: 2"};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
 
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), resultPath);
-       }
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(COUNTS_ARRAY), 
resultPath);
+  }
 
-       @Override
-       protected void testProgram() throws Exception {
+  @Override
+  protected void testProgram() throws Exception {
 
-               Pipeline p = FlinkTestPipeline.createForBatch();
+    Pipeline p = FlinkTestPipeline.createForBatch();
 
-               PCollection<String> input = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
+    PCollection<String> input = 
p.apply(Create.of(WORDS)).setCoder(StringUtf8Coder.of());
 
-               input
-                               .apply(new WordCount.CountWords())
-                               .apply(MapElements.via(new 
WordCount.FormatAsTextFn()))
-                               .apply(TextIO.Write.to(resultPath));
+    input
+        .apply(new WordCount.CountWords())
+        .apply(MapElements.via(new WordCount.FormatAsTextFn()))
+        .apply(TextIO.Write.to(resultPath));
 
-               p.run();
-       }
+    p.run();
+  }
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
index ccc52c4..1cac036 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin2ITCase.java
@@ -33,104 +33,104 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class WordCountJoin2ITCase extends JavaProgramTestBase {
 
-       static final String[] WORDS_1 = new String[] {
-                       "hi there", "hi", "hi sue bob",
-                       "hi sue", "", "bob hi"};
-
-       static final String[] WORDS_2 = new String[] {
-                       "hi tim", "beauty", "hooray sue bob",
-                       "hi there", "", "please say hi"};
-
-       static final String[] RESULTS = new String[] {
-                       "beauty -> Tag1: Tag2: 1",
-                       "bob -> Tag1: 2 Tag2: 1",
-                       "hi -> Tag1: 5 Tag2: 3",
-                       "hooray -> Tag1: Tag2: 1",
-                       "please -> Tag1: Tag2: 1",
-                       "say -> Tag1: Tag2: 1",
-                       "sue -> Tag1: 2 Tag2: 1",
-                       "there -> Tag1: 1 Tag2: 1",
-                       "tim -> Tag1: Tag2: 1"
-       };
-
-       static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-       static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-
-       protected String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               /* Create two PCollections and join them */
-               PCollection<KV<String,Long>> occurences1 = 
p.apply(Create.of(WORDS_1))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               .apply(Count.<String>perElement());
-
-               PCollection<KV<String,Long>> occurences2 = 
p.apply(Create.of(WORDS_2))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               .apply(Count.<String>perElement());
-
-               /* CoGroup the two collections */
-               PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
-                               .of(tag1, occurences1)
-                               .and(tag2, occurences2)
-                               .apply(CoGroupByKey.<String>create());
-
-               /* Format output */
-               mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-                               
.apply(TextIO.Write.named("test").to(resultPath));
-
-               p.run();
-       }
-
-
-       static class ExtractWordsFn extends DoFn<String, String> {
-
-               @Override
-               public void startBundle(Context c) {
-               }
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       // Split the line into words.
-                       String[] words = c.element().split("[^a-zA-Z']+");
-
-                       // Output each word encountered into the output 
PCollection.
-                       for (String word : words) {
-                               if (!word.isEmpty()) {
-                                       c.output(word);
-                               }
-                       }
-               }
-       }
-
-       static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, 
String> {
-               @Override
-               public void processElement(ProcessContext c) {
-                       CoGbkResult value = c.element().getValue();
-                       String key = c.element().getKey();
-                       String countTag1 = tag1.getId() + ": ";
-                       String countTag2 = tag2.getId() + ": ";
-                       for (Long count : value.getAll(tag1)) {
-                               countTag1 += count + " ";
-                       }
-                       for (Long count : value.getAll(tag2)) {
-                               countTag2 += count;
-                       }
-                       c.output(key + " -> " + countTag1 + countTag2);
-               }
-       }
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1",
+      "bob -> Tag1: 2 Tag2: 1",
+      "hi -> Tag1: 5 Tag2: 3",
+      "hooray -> Tag1: Tag2: 1",
+      "please -> Tag1: Tag2: 1",
+      "say -> Tag1: Tag2: 1",
+      "sue -> Tag1: 2 Tag2: 1",
+      "there -> Tag1: 1 Tag2: 1",
+      "tim -> Tag1: Tag2: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2);
+    }
+  }
 
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
index e6eddc0..4c8b99b 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WordCountJoin3ITCase.java
@@ -33,122 +33,122 @@ import org.apache.flink.test.util.JavaProgramTestBase;
 
 public class WordCountJoin3ITCase extends JavaProgramTestBase {
 
-       static final String[] WORDS_1 = new String[] {
-                       "hi there", "hi", "hi sue bob",
-                       "hi sue", "", "bob hi"};
-
-       static final String[] WORDS_2 = new String[] {
-                       "hi tim", "beauty", "hooray sue bob",
-                       "hi there", "", "please say hi"};
-
-       static final String[] WORDS_3 = new String[] {
-                       "hi stephan", "beauty", "hooray big fabian",
-                       "hi yo", "", "please say hi"};
-
-       static final String[] RESULTS = new String[] {
-                       "beauty -> Tag1: Tag2: 1 Tag3: 1",
-                       "bob -> Tag1: 2 Tag2: 1 Tag3: ",
-                       "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
-                       "hooray -> Tag1: Tag2: 1 Tag3: 1",
-                       "please -> Tag1: Tag2: 1 Tag3: 1",
-                       "say -> Tag1: Tag2: 1 Tag3: 1",
-                       "sue -> Tag1: 2 Tag2: 1 Tag3: ",
-                       "there -> Tag1: 1 Tag2: 1 Tag3: ",
-                       "tim -> Tag1: Tag2: 1 Tag3: ",
-                       "stephan -> Tag1: Tag2: Tag3: 1",
-                       "yo -> Tag1: Tag2: Tag3: 1",
-                       "fabian -> Tag1: Tag2: Tag3: 1",
-                       "big -> Tag1: Tag2: Tag3: 1"
-       };
-
-       static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
-       static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
-       static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
-
-       protected String resultPath;
-
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               /* Create two PCollections and join them */
-               PCollection<KV<String,Long>> occurences1 = 
p.apply(Create.of(WORDS_1))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               .apply(Count.<String>perElement());
-
-               PCollection<KV<String,Long>> occurences2 = 
p.apply(Create.of(WORDS_2))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               .apply(Count.<String>perElement());
-
-               PCollection<KV<String,Long>> occurences3 = 
p.apply(Create.of(WORDS_3))
-                               .apply(ParDo.of(new ExtractWordsFn()))
-                               .apply(Count.<String>perElement());
-
-               /* CoGroup the two collections */
-               PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
-                               .of(tag1, occurences1)
-                               .and(tag2, occurences2)
-                               .and(tag3, occurences3)
-                               .apply(CoGroupByKey.<String>create());
-
-               /* Format output */
-               mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
-                               
.apply(TextIO.Write.named("test").to(resultPath));
-
-               p.run();
-       }
-
-
-       static class ExtractWordsFn extends DoFn<String, String> {
-
-               @Override
-               public void startBundle(Context c) {
-               }
-
-               @Override
-               public void processElement(ProcessContext c) {
-                       // Split the line into words.
-                       String[] words = c.element().split("[^a-zA-Z']+");
-
-                       // Output each word encountered into the output 
PCollection.
-                       for (String word : words) {
-                               if (!word.isEmpty()) {
-                                       c.output(word);
-                               }
-                       }
-               }
-       }
-
-       static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, 
String> {
-               @Override
-               public void processElement(ProcessContext c) {
-                       CoGbkResult value = c.element().getValue();
-                       String key = c.element().getKey();
-                       String countTag1 = tag1.getId() + ": ";
-                       String countTag2 = tag2.getId() + ": ";
-                       String countTag3 = tag3.getId() + ": ";
-                       for (Long count : value.getAll(tag1)) {
-                               countTag1 += count + " ";
-                       }
-                       for (Long count : value.getAll(tag2)) {
-                               countTag2 += count + " ";
-                       }
-                       for (Long count : value.getAll(tag3)) {
-                               countTag3 += count;
-                       }
-                       c.output(key + " -> " + countTag1 + countTag2 + 
countTag3);
-               }
-       }
+  static final String[] WORDS_1 = new String[] {
+      "hi there", "hi", "hi sue bob",
+      "hi sue", "", "bob hi"};
+
+  static final String[] WORDS_2 = new String[] {
+      "hi tim", "beauty", "hooray sue bob",
+      "hi there", "", "please say hi"};
+
+  static final String[] WORDS_3 = new String[] {
+      "hi stephan", "beauty", "hooray big fabian",
+      "hi yo", "", "please say hi"};
+
+  static final String[] RESULTS = new String[] {
+      "beauty -> Tag1: Tag2: 1 Tag3: 1",
+      "bob -> Tag1: 2 Tag2: 1 Tag3: ",
+      "hi -> Tag1: 5 Tag2: 3 Tag3: 3",
+      "hooray -> Tag1: Tag2: 1 Tag3: 1",
+      "please -> Tag1: Tag2: 1 Tag3: 1",
+      "say -> Tag1: Tag2: 1 Tag3: 1",
+      "sue -> Tag1: 2 Tag2: 1 Tag3: ",
+      "there -> Tag1: 1 Tag2: 1 Tag3: ",
+      "tim -> Tag1: Tag2: 1 Tag3: ",
+      "stephan -> Tag1: Tag2: Tag3: 1",
+      "yo -> Tag1: Tag2: Tag3: 1",
+      "fabian -> Tag1: Tag2: Tag3: 1",
+      "big -> Tag1: Tag2: Tag3: 1"
+  };
+
+  static final TupleTag<Long> tag1 = new TupleTag<>("Tag1");
+  static final TupleTag<Long> tag2 = new TupleTag<>("Tag2");
+  static final TupleTag<Long> tag3 = new TupleTag<>("Tag3");
+
+  protected String resultPath;
+
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(RESULTS), resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    /* Create two PCollections and join them */
+    PCollection<KV<String,Long>> occurences1 = p.apply(Create.of(WORDS_1))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences2 = p.apply(Create.of(WORDS_2))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    PCollection<KV<String,Long>> occurences3 = p.apply(Create.of(WORDS_3))
+        .apply(ParDo.of(new ExtractWordsFn()))
+        .apply(Count.<String>perElement());
+
+    /* CoGroup the two collections */
+    PCollection<KV<String, CoGbkResult>> mergedOccurences = 
KeyedPCollectionTuple
+        .of(tag1, occurences1)
+        .and(tag2, occurences2)
+        .and(tag3, occurences3)
+        .apply(CoGroupByKey.<String>create());
+
+    /* Format output */
+    mergedOccurences.apply(ParDo.of(new FormatCountsFn()))
+        .apply(TextIO.Write.named("test").to(resultPath));
+
+    p.run();
+  }
+
+
+  static class ExtractWordsFn extends DoFn<String, String> {
+
+    @Override
+    public void startBundle(Context c) {
+    }
+
+    @Override
+    public void processElement(ProcessContext c) {
+      // Split the line into words.
+      String[] words = c.element().split("[^a-zA-Z']+");
+
+      // Output each word encountered into the output PCollection.
+      for (String word : words) {
+        if (!word.isEmpty()) {
+          c.output(word);
+        }
+      }
+    }
+  }
+
+  static class FormatCountsFn extends DoFn<KV<String, CoGbkResult>, String> {
+    @Override
+    public void processElement(ProcessContext c) {
+      CoGbkResult value = c.element().getValue();
+      String key = c.element().getKey();
+      String countTag1 = tag1.getId() + ": ";
+      String countTag2 = tag2.getId() + ": ";
+      String countTag3 = tag3.getId() + ": ";
+      for (Long count : value.getAll(tag1)) {
+        countTag1 += count + " ";
+      }
+      for (Long count : value.getAll(tag2)) {
+        countTag2 += count + " ";
+      }
+      for (Long count : value.getAll(tag3)) {
+        countTag3 += count;
+      }
+      c.output(key + " -> " + countTag1 + countTag2 + countTag3);
+    }
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/8852eb15/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
----------------------------------------------------------------------
diff --git 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
index 865fc5f..a61bf52 100644
--- 
a/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
+++ 
b/runners/flink/src/test/java/com/dataartisans/flink/dataflow/WriteSinkITCase.java
@@ -39,118 +39,118 @@ import static org.junit.Assert.*;
  */
 public class WriteSinkITCase extends JavaProgramTestBase {
 
-       protected String resultPath;
+  protected String resultPath;
 
-       public WriteSinkITCase(){
-       }
+  public WriteSinkITCase(){
+  }
 
-       static final String[] EXPECTED_RESULT = new String[] {
-                       "Joe red 3", "Mary blue 4", "Max yellow 23"};
+  static final String[] EXPECTED_RESULT = new String[] {
+      "Joe red 3", "Mary blue 4", "Max yellow 23"};
 
-       @Override
-       protected void preSubmit() throws Exception {
-               resultPath = getTempDirPath("result");
-       }
-
-       @Override
-       protected void postSubmit() throws Exception {
-               
compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
-       }
-
-       @Override
-       protected void testProgram() throws Exception {
-               runProgram(resultPath);
-       }
-
-       private static void runProgram(String resultPath) {
-               Pipeline p = FlinkTestPipeline.createForBatch();
-
-               
p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
-                       .apply("CustomSink", Write.to(new 
MyCustomSink(resultPath)));
-
-               p.run();
-       }
-
-       /**
-        * Simple custom sink which writes to a file.
-        */
-       private static class MyCustomSink extends Sink<String> {
-
-               private final String resultPath;
-
-               public MyCustomSink(String resultPath) {
-                       this.resultPath = resultPath;
-               }
-
-               @Override
-               public void validate(PipelineOptions options) {
-                       assertNotNull(options);
-               }
-
-               @Override
-               public WriteOperation<String, ?> 
createWriteOperation(PipelineOptions options) {
-                       return new MyWriteOperation();
-               }
-
-               private class MyWriteOperation extends WriteOperation<String, 
String> {
-
-                       @Override
-                       public Coder<String> getWriterResultCoder() {
-                               return StringUtf8Coder.of();
-                       }
-
-                       @Override
-                       public void initialize(PipelineOptions options) throws 
Exception {
-
-                       }
-
-                       @Override
-                       public void finalize(Iterable<String> writerResults, 
PipelineOptions options) throws Exception {
-
-                       }
-
-                       @Override
-                       public Writer<String, String> 
createWriter(PipelineOptions options) throws Exception {
-                               return new MyWriter();
-                       }
-
-                       @Override
-                       public Sink<String> getSink() {
-                               return MyCustomSink.this;
-                       }
-
-                       /**
-                        * Simple Writer which writes to a file.
-                        */
-                       private class MyWriter extends Writer<String, String> {
-
-                               private PrintWriter internalWriter;
-
-                               @Override
-                               public void open(String uId) throws Exception {
-                                       Path path = new Path(resultPath + "/" + 
uId);
-                                       FileSystem.get(new 
URI("file:///")).create(path, false);
-                                       internalWriter = new PrintWriter(new 
File(path.toUri()));
-                               }
-
-                               @Override
-                               public void write(String value) throws 
Exception {
-                                       internalWriter.println(value);
-                               }
-
-                               @Override
-                               public String close() throws Exception {
-                                       internalWriter.close();
-                                       return resultPath;
-                               }
-
-                               @Override
-                               public WriteOperation<String, String> 
getWriteOperation() {
-                                       return MyWriteOperation.this;
-                               }
-                       }
-               }
-       }
+  @Override
+  protected void preSubmit() throws Exception {
+    resultPath = getTempDirPath("result");
+  }
+
+  @Override
+  protected void postSubmit() throws Exception {
+    compareResultsByLinesInMemory(Joiner.on('\n').join(EXPECTED_RESULT), 
resultPath);
+  }
+
+  @Override
+  protected void testProgram() throws Exception {
+    runProgram(resultPath);
+  }
+
+  private static void runProgram(String resultPath) {
+    Pipeline p = FlinkTestPipeline.createForBatch();
+
+    p.apply(Create.of(EXPECTED_RESULT)).setCoder(StringUtf8Coder.of())
+      .apply("CustomSink", Write.to(new MyCustomSink(resultPath)));
+
+    p.run();
+  }
+
+  /**
+   * Simple custom sink which writes to a file.
+   */
+  private static class MyCustomSink extends Sink<String> {
+
+    private final String resultPath;
+
+    public MyCustomSink(String resultPath) {
+      this.resultPath = resultPath;
+    }
+
+    @Override
+    public void validate(PipelineOptions options) {
+      assertNotNull(options);
+    }
+
+    @Override
+    public WriteOperation<String, ?> createWriteOperation(PipelineOptions 
options) {
+      return new MyWriteOperation();
+    }
+
+    private class MyWriteOperation extends WriteOperation<String, String> {
+
+      @Override
+      public Coder<String> getWriterResultCoder() {
+        return StringUtf8Coder.of();
+      }
+
+      @Override
+      public void initialize(PipelineOptions options) throws Exception {
+
+      }
+
+      @Override
+      public void finalize(Iterable<String> writerResults, PipelineOptions 
options) throws Exception {
+
+      }
+
+      @Override
+      public Writer<String, String> createWriter(PipelineOptions options) 
throws Exception {
+        return new MyWriter();
+      }
+
+      @Override
+      public Sink<String> getSink() {
+        return MyCustomSink.this;
+      }
+
+      /**
+       * Simple Writer which writes to a file.
+       */
+      private class MyWriter extends Writer<String, String> {
+
+        private PrintWriter internalWriter;
+
+        @Override
+        public void open(String uId) throws Exception {
+          Path path = new Path(resultPath + "/" + uId);
+          FileSystem.get(new URI("file:///")).create(path, false);
+          internalWriter = new PrintWriter(new File(path.toUri()));
+        }
+
+        @Override
+        public void write(String value) throws Exception {
+          internalWriter.println(value);
+        }
+
+        @Override
+        public String close() throws Exception {
+          internalWriter.close();
+          return resultPath;
+        }
+
+        @Override
+        public WriteOperation<String, String> getWriteOperation() {
+          return MyWriteOperation.this;
+        }
+      }
+    }
+  }
 
 }
 

Reply via email to