http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
index 0a4fd1a..db2c2b7 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java
@@ -51,7 +51,7 @@ public class FirstLastFunction extends FunctionIndex
    * @param  alias   Alias name for output.
    * @param  isFirst return first value if true.
    */
-  public FirstLastFunction(@NotNull String column,String alias, boolean isLast)
+  public FirstLastFunction(@NotNull String column, String alias, boolean 
isLast)
   {
     super(column, alias);
     isFirst = !isLast;
@@ -63,14 +63,20 @@ public class FirstLastFunction extends FunctionIndex
   @Override
   public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
   {
-    if (rows.size() == 0) return null;
+    if (rows.size() == 0) {
+      return null;
+    }
     if (isFirst) {
-      for (int i=0; i < rows.size(); i++) {
-        if (rows.get(i).get(column) != null) return rows.get(i).get(column);
+      for (int i = 0; i < rows.size(); i++) {
+        if (rows.get(i).get(column) != null) {
+          return rows.get(i).get(column);
+        }
       }
     } else {
-      for (int i= (rows.size()-1); i >= 0;  i--) {
-        if (rows.get(i).get(column) != null) return rows.get(i).get(column);
+      for (int i = (rows.size() - 1); i >= 0; i--) {
+        if (rows.get(i).get(column) != null) {
+          return rows.get(i).get(column);
+        }
       }
     }
     return null;
@@ -83,9 +89,11 @@ public class FirstLastFunction extends FunctionIndex
   @Override
   protected String aggregateName()
   {
-    if (!StringUtils.isEmpty(alias)) return alias;
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
     if (isFirst) {
-        return "FIRST(" + column + ")";
+      return "FIRST(" + column + ")";
     }
     return "LAST(" + column + ")";
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
index 57376e4..918ca89 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java
@@ -36,7 +36,7 @@ import javax.validation.constraints.NotNull;
  * @tags sql aggregate
  * @since 0.3.4
  */
-abstract public class FunctionIndex
+public abstract class FunctionIndex
 {
   /**
    * Column name.
@@ -64,13 +64,13 @@ abstract public class FunctionIndex
    * @param rows Tuple list over application window.
    * @return aggregate result object.
    */
-  abstract public Object compute(@NotNull ArrayList<Map<String, Object>> rows) 
throws Exception;
+  public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) 
throws Exception;
 
   /**
    * Get aggregate output value name.
    * @return name string.
    */
-  abstract protected String aggregateName();
+  protected abstract String aggregateName();
 
   /**
    * Apply compute function to given rows and store result in collect by 
output value name.
@@ -78,10 +78,16 @@ abstract public class FunctionIndex
    */
   public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> 
collect) throws Exception
   {
-    if (rows == null) return;
+    if (rows == null) {
+      return;
+    }
     String name = column;
-    if (alias != null) name = alias;
-    if (name == null) name = aggregateName();
+    if (alias != null) {
+      name = alias;
+    }
+    if (name == null) {
+      name = aggregateName();
+    }
     collect.put(name, compute(rows));
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
index 596e080..f02e82c 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java
@@ -67,8 +67,10 @@ public class MaxMinFunction extends FunctionIndex
     double minMax = 0.0;
     for (Map<String, Object> row : rows) {
       double value = ((Number)row.get(column)).doubleValue();
-      if ((isMax && (minMax < value))||(!isMax && (minMax > value))) minMax = 
value;
-     }
+      if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) {
+        minMax = value;
+      }
+    }
     return minMax;
   }
 
@@ -79,8 +81,12 @@ public class MaxMinFunction extends FunctionIndex
   @Override
   protected String aggregateName()
   {
-    if (!StringUtils.isEmpty(alias)) return alias;
-    if (isMax) return "MAX(" + column + ")";
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
+    if (isMax) {
+      return "MAX(" + column + ")";
+    }
     return "MIN(" + column + ")";
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
index 484aa95..02186cd 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java
@@ -45,8 +45,10 @@ public class SumFunction extends FunctionIndex
   {
     Double result = 0.0;
     for (Map<String, Object> row : rows) {
-        if (!row.containsKey(column)) continue;
-        result += ((Number)row.get(column)).doubleValue();
+      if (!row.containsKey(column)) {
+        continue;
+      }
+      result += ((Number)row.get(column)).doubleValue();
     }
     return result;
   }
@@ -54,7 +56,7 @@ public class SumFunction extends FunctionIndex
   @Override
   protected String aggregateName()
   {
-   return "Sum(" + column;
+    return "Sum(" + column;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
index 23fa86b..21c1d11 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java
@@ -30,7 +30,7 @@ import javax.validation.constraints.NotNull;
  * @tags alias
  * @since 0.3.4
  */
-abstract public class BinaryExpression  implements Index
+public abstract class BinaryExpression  implements Index
 {
   /**
    * Left column name argument for expression.
@@ -50,9 +50,9 @@ abstract public class BinaryExpression  implements Index
   protected String alias;
 
   /**
-   * @param Left column name argument for expression.
-   * @param Right column name argument for expression.
-   * @param Alias name for output field.
+   * @param left column name argument for expression.
+   * @param right column name argument for expression.
+   * @param alias name for output field.
    */
   public BinaryExpression(@NotNull String left, @NotNull String right, String 
alias)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java
index 78cc547..a4ad2b7 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java
@@ -53,7 +53,9 @@ public class ColumnIndex implements Index
   public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
   {
     String name = getColumn();
-    if (alias != null) name = alias;
+    if (alias != null) {
+      name = alias;
+    }
     collect.put(name, row.get(name));
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java
index 890185b..5067d00 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java
@@ -35,5 +35,5 @@ public interface Index
   /**
    * Function can key/value hash map, does metric implemented by sub class.
    */
-       public void filter(@NotNull Map<String,Object> row, @NotNull 
Map<String, Object> collect);
+  public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, 
Object> collect);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
index 89456b2..931ddaa 100644
--- a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
+++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java
@@ -38,22 +38,26 @@ public class MidIndex extends ColumnIndex
   public MidIndex(@NotNull String column, String alias, int start)
   {
     super(column, alias);
-    assert(start >= 0);
+    assert (start >= 0);
     this.start = start;
   }
 
   @Override
   public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
   {
-    if (!row.containsKey(column)) return;
+    if (!row.containsKey(column)) {
+      return;
+    }
     if (!(row.get(column) instanceof String)) {
-      assert(false);
+      assert (false);
     }
     String name = getColumn();
-    if (alias != null) name = alias;
+    if (alias != null) {
+      name = alias;
+    }
 
     int endIndex = start + length;
-    if ((length == 0)||(endIndex > ((String)row.get(column)).length())) {
+    if ((length == 0) || (endIndex > ((String)row.get(column)).length())) {
       collect.put(name, row.get(column));
     } else {
       collect.put(name, ((String)row.get(column)).substring(start, endIndex));
@@ -67,7 +71,7 @@ public class MidIndex extends ColumnIndex
 
   public void setLength(int length)
   {
-    assert(length > 0);
+    assert (length > 0);
     this.length = length;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
index 792ad80..969e3af 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java
@@ -40,7 +40,9 @@ public class NegateExpression extends UnaryExpression
   public NegateExpression(@Null String column, String alias)
   {
     super(column, alias);
-    if (this.alias == null)  this.alias = "NEGATE(" + column + ")";
+    if (this.alias == null) {
+      this.alias = "NEGATE(" + column + ")";
+    }
   }
 
   /* (non-Javadoc)
@@ -49,7 +51,9 @@ public class NegateExpression extends UnaryExpression
   @Override
   public void filter(Map<String, Object> row, Map<String, Object> collect)
   {
-    if (!row.containsKey(column)) return;
+    if (!row.containsKey(column)) {
+      return;
+    }
     collect.put(alias, -((Number)row.get(column)).doubleValue());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
index 46a563f..90e16a1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java
@@ -37,17 +37,23 @@ public class RoundDoubleIndex  extends ColumnIndex
   {
     super(column, alias);
     rounder = 1;
-    if (numDecimals > 0) rounder = (int) Math.pow(10, numDecimals);
+    if (numDecimals > 0) {
+      rounder = (int)Math.pow(10, numDecimals);
+    }
   }
 
   @Override
   public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
   {
-    if (!row.containsKey(column)) return;
-    double value = (Double) row.get(column);
-    value = Math.round(value * rounder)/rounder;
+    if (!row.containsKey(column)) {
+      return;
+    }
+    double value = (Double)row.get(column);
+    value = Math.round(value * rounder) / rounder;
     String name = getColumn();
-    if (alias != null) name = alias;
+    if (alias != null) {
+      name = alias;
+    }
     collect.put(name, value);
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
index 2d792ff..2c49a79 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java
@@ -42,13 +42,17 @@ public class StringCaseIndex extends  ColumnIndex
   @Override
   public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
   {
-    if (!row.containsKey(column)) return;
+    if (!row.containsKey(column)) {
+      return;
+    }
     if (!(row.get(column) instanceof String)) {
-      assert(false);
+      assert (false);
     }
 
     String name = getColumn();
-    if (alias != null) name = alias;
+    if (alias != null) {
+      name = alias;
+    }
     if (toUpperCase) {
       collect.put(name, ((String)row.get(column)).toUpperCase());
     } else {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
index 4fa05b5..4dbfee1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java
@@ -40,13 +40,17 @@ public class StringLenIndex  extends ColumnIndex
   @Override
   public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
   {
-    if (!row.containsKey(column)) return;
+    if (!row.containsKey(column)) {
+      return;
+    }
     if (!(row.get(column) instanceof String)) {
-      assert(false);
+      assert (false);
     }
 
     String name = getColumn();
-    if (alias != null) name = alias;
+    if (alias != null) {
+      name = alias;
+    }
     collect.put(name, ((String)row.get(column)).length());
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
index acddf51..a0144da 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java
@@ -36,14 +36,16 @@ public class SumExpression extends BinaryExpression
 {
 
   /**
-   * @param Left column name argument for expression.
-   * @param Right column name argument for expression.
-   * @param Alias name for output field.
+   * @param left column name argument for expression.
+   * @param right column name argument for expression.
+   * @param alias name for output field.
    */
   public SumExpression(@NotNull String left, @NotNull String right, String 
alias)
   {
     super(left, right, alias);
-    if (this.alias == null) this.alias = "SUM(" + left + "," + right + ")";
+    if (this.alias == null) {
+      this.alias = "SUM(" + left + "," + right + ")";
+    }
   }
 
   /* sum column values.
@@ -52,7 +54,9 @@ public class SumExpression extends BinaryExpression
   @Override
   public void filter(Map<String, Object> row, Map<String, Object> collect)
   {
-    if (!row.containsKey(left) || !row.containsKey(right)) return;
+    if (!row.containsKey(left) || !row.containsKey(right)) {
+      return;
+    }
     collect.put(alias, ((Number)row.get(left)).doubleValue() + 
((Number)row.get(right)).doubleValue());
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
index ea52986..45e90ec 100644
--- 
a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
+++ 
b/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java
@@ -31,7 +31,7 @@ import javax.validation.constraints.NotNull;
  * @tags unary, alias
  * @since 0.3.4
  */
-abstract public class UnaryExpression  implements Index
+public abstract class UnaryExpression  implements Index
 {
   /**
    * Column name argument for unary expression.
@@ -45,8 +45,8 @@ abstract public class UnaryExpression  implements Index
   protected String alias;
 
   /**
-   * @param Column name argument for unary expression.
-   * @param Alias name for output field.
+   * @param column name argument for unary expression.
+   * @param alias name for output field.
    */
   public UnaryExpression(@NotNull String column, String alias)
   {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java 
b/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java
index c6df099..93889be 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-
 import java.util.ArrayList;
 import java.util.HashMap;
+
 import org.apache.commons.lang.mutable.MutableInt;
 
+import com.datatorrent.api.Sink;
+
 /**
  * A sink implementation to collect expected test results in a HashMap.
  * <p>
@@ -61,7 +62,7 @@ public class ArrayListTestSink<T> implements Sink<T>
   {
     this.count++;
     @SuppressWarnings("unchecked")
-    ArrayList<Object> list = (ArrayList<Object>) tuple;
+    ArrayList<Object> list = (ArrayList<Object>)tuple;
     for (Object o: list) {
       MutableInt val = map.get(o);
       if (val == null) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java 
b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
index 778a82a..2fd776e 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-
 import java.util.ArrayList;
 import java.util.List;
 
+import com.datatorrent.api.Sink;
+
 /**
  * A sink implementation to collect expected test results.
  * <p>
@@ -33,7 +33,7 @@ import java.util.List;
  */
 public class CollectorTestSink<T> implements Sink<T>
 {
-  final public List<T> collectedTuples = new ArrayList<T>();
+  public final List<T> collectedTuples = new ArrayList<T>();
 
   /**
    * clears data
@@ -46,10 +46,10 @@ public class CollectorTestSink<T> implements Sink<T>
   @Override
   public void put(T payload)
   {
-      synchronized (collectedTuples) {
-        collectedTuples.add(payload);
-        collectedTuples.notifyAll();
-      }
+    synchronized (collectedTuples) {
+      collectedTuples.add(payload);
+      collectedTuples.notifyAll();
+    }
   }
 
   public void waitForResultCount(int count, long timeoutMillis) throws 
InterruptedException

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java 
b/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java
index d338d34..a309b89 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java
@@ -21,9 +21,9 @@ package com.datatorrent.lib.testbench;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * <p>Implements Compare Filter Tuples class.</p>
@@ -35,9 +35,12 @@ import com.datatorrent.api.DefaultOutputPort;
  */
 public class CompareFilterTuples<k> extends BaseOperator
 {
-       // Compare type function
+  // Compare type function
   private Compare compareType = Compare.Equal;
-  public enum Compare { Smaller, Equal, Greater }
+  public enum Compare
+  {
+    Smaller, Equal, Greater
+  }
 
   /**
    * Compare the incoming value with the Property value.
@@ -45,69 +48,85 @@ public class CompareFilterTuples<k> extends BaseOperator
   */
   public void setCompareType(Compare type)
   {
-       compareType = type;
+    compareType = type;
   }
 
   // compare value
   private int value;
   public void setValue(int value)
   {
-       this.value = value;
+    this.value = value;
   }
 
   // Collected result tuples
   private Map<k, Integer> result;
 
-        /**
-        * Input port that takes a map of integer values.
-        */
-       public final transient DefaultInputPort<Map<k, Integer>> inport = new 
DefaultInputPort<Map<k, Integer>>() {
+  /**
+   * Input port that takes a map of integer values.
+   */
+
+  public final transient DefaultInputPort<Map<k, Integer>> inport = new 
DefaultInputPort<Map<k, Integer>>()
+  {
     @Override
-    public void process(Map<k, Integer> map) {
-       for(Map.Entry<k, Integer> entry : map.entrySet())
-       {
-               if ( compareType == Compare.Equal ) 
if(entry.getValue().intValue() == value) result.put(entry.getKey(), 
entry.getValue());
-               if ( compareType == Compare.Greater ) 
if(entry.getValue().intValue() > value) result.put(entry.getKey(), 
entry.getValue());
-               if ( compareType == Compare.Smaller ) 
if(entry.getValue().intValue() < value) result.put(entry.getKey(), 
entry.getValue());
-       }
+    public void process(Map<k, Integer> map)
+    {
+      for (Map.Entry<k, Integer> entry : map.entrySet()) {
+        if (compareType == Compare.Equal) {
+          if (entry.getValue().intValue() == value) {
+            result.put(entry.getKey(),
+                entry.getValue());
+          }
+        }
+        if (compareType == Compare.Greater) {
+          if (entry.getValue().intValue() > value) {
+            result.put(entry.getKey(),
+                entry.getValue());
+          }
+        }
+        if (compareType == Compare.Smaller) {
+          if (entry.getValue().intValue() < value) {
+            result.put(entry.getKey(),
+                entry.getValue());
+          }
+        }
+      }
     }
-       };
+  };
 
-       /**
-        * Output port that emits a map of integer values.
-        */
-       public final transient DefaultOutputPort<Map<k, Integer>> outport = new 
DefaultOutputPort<Map<k, Integer>>();
+  /**
+   * Output port that emits a map of integer values.
+   */
+  public final transient DefaultOutputPort<Map<k, Integer>> outport = new 
DefaultOutputPort<Map<k, Integer>>();
 
         /**
-        * Output redis port that emits a map of &lt;integer,string&gt; values.
-        */
-       public final transient DefaultOutputPort<Map<Integer, String>> 
redisport = new DefaultOutputPort<Map<Integer, String>>();
+   * Output redis port that emits a map of &lt;integer,string&gt; values.
+   */
+  public final transient DefaultOutputPort<Map<Integer, String>> redisport = 
new DefaultOutputPort<Map<Integer, String>>();
 
-       @Override
-       public void beginWindow(long windowId)
-       {
-               result  = new HashMap<k, Integer>();
-       }
+  @Override
+  public void beginWindow(long windowId)
+  {
+    result  = new HashMap<k, Integer>();
+  }
 
-       @Override
-       public void endWindow()
-       {
-               outport.emit(result);
+  @Override
+  public void endWindow()
+  {
+    outport.emit(result);
 
-               int numOuts = 1;
-               Integer total = 0;
-               for (Map.Entry<k, Integer>  entry : result.entrySet())
-               {
-                       Map<Integer, String> tuple = new HashMap<Integer, 
String>();
-                       tuple.put(numOuts++, entry.getKey().toString());
-                       redisport.emit(tuple);
-                       total += entry.getValue();
-               }
-               Map<Integer, String> tuple = new HashMap<Integer, String>();
-               tuple.put(numOuts++, total.toString());
-               redisport.emit(tuple);
-               tuple = new HashMap<Integer, String>();
-               tuple.put(0, new Integer(numOuts).toString());
-               redisport.emit(tuple);
-       }
+    int numOuts = 1;
+    Integer total = 0;
+    for (Map.Entry<k, Integer>  entry : result.entrySet()) {
+      Map<Integer, String> tuple = new HashMap<Integer, String>();
+      tuple.put(numOuts++, entry.getKey().toString());
+      redisport.emit(tuple);
+      total += entry.getValue();
+    }
+    Map<Integer, String> tuple = new HashMap<Integer, String>();
+    tuple.put(numOuts++, total.toString());
+    redisport.emit(tuple);
+    tuple = new HashMap<Integer, String>();
+    tuple.put(0, new Integer(numOuts).toString());
+    redisport.emit(tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java
 
b/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java
index 9e4f8b5..73506ae 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java
@@ -43,7 +43,7 @@ public class CountAndLastTupleTestSink<T> extends 
CountTestSink<T>
   @Override
   public void put(T tuple)
   {
-      this.tuple = tuple;
-      count++;
+    this.tuple = tuple;
+    count++;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java 
b/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java
index 29343c9..9f6df10 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java
@@ -22,10 +22,10 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * <p>A base implementation of an operator which does count occurrence.</p>
@@ -37,70 +37,70 @@ import com.datatorrent.api.Context.OperatorContext;
  */
 public class CountOccurance<k> extends BaseOperator
 {
-       private Map<k, Integer> collect;
-       public final transient DefaultInputPort<k> inport = new 
DefaultInputPort<k>() {
+  private Map<k, Integer> collect;
+  public final transient DefaultInputPort<k> inport = new DefaultInputPort<k>()
+  {
     @Override
-    public void process(k s) {
-       if (collect.containsKey(s))
-       {
-               Integer value = (Integer)collect.remove(s);
-               collect.put(s, new Integer(value+1));
-       } else {
-               collect.put(s, new Integer(1));
-       }
+    public void process(k s)
+    {
+      if (collect.containsKey(s)) {
+        Integer value = (Integer)collect.remove(s);
+        collect.put(s, new Integer(value + 1));
+      } else {
+        collect.put(s, new Integer(1));
+      }
     }
-       };
+  };
 
-       @Override
-       public void setup(OperatorContext context)
-       {
-       }
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
 
-       @Override
-       public void teardown()
-       {
-       }
+  @Override
+  public void teardown()
+  {
+  }
 
-       @Override
-       public void beginWindow(long windowId)
-       {
-               collect  = new HashMap<k, Integer>();
-       }
+  @Override
+  public void beginWindow(long windowId)
+  {
+    collect  = new HashMap<k, Integer>();
+  }
 
-       /**
-        * Output port that emits a map of integer values.
-        */
-       public final transient DefaultOutputPort<Map<k, Integer>> outport = new 
DefaultOutputPort<Map<k, Integer>>();
+  /**
+   * Output port that emits a map of integer values.
+   */
+  public final transient DefaultOutputPort<Map<k, Integer>> outport = new 
DefaultOutputPort<Map<k, Integer>>();
 
         /**
-        * Output dimensions port that emits a map of &lt;string,object&gt; 
values.
-        */
-       public final transient DefaultOutputPort<Map<String, Object>> 
dimensionOut = new DefaultOutputPort<Map<String, Object>>();
+   * Output dimensions port that emits a map of &lt;string,object&gt; values.
+   */
+  public final transient DefaultOutputPort<Map<String, Object>> dimensionOut = 
new DefaultOutputPort<Map<String, Object>>();
 
-        /**
-        * Output total port that emits a map of &lt;string,integer&gt; count 
values.
-        */
-        public final transient DefaultOutputPort<Map<String,Integer>> total = 
new DefaultOutputPort<Map<String,Integer>>();
+  /**
+   * Output total port that emits a map of &lt;string,integer&gt; count values.
+   */
+  public final transient DefaultOutputPort<Map<String, Integer>> total = new 
DefaultOutputPort<Map<String, Integer>>();
 
-       @Override
-       public void endWindow()
-       {
-               outport.emit(collect);
-               long timestamp = new Date().getTime();
-               int allcount = 0;
-               for(Map.Entry<k, Integer> entry : collect.entrySet())
-               {
-                       Map<String, Object> map = new HashMap<String, Object>();
-                       map.put("timestamp", timestamp);
-                       map.put("item", entry.getKey());
-                       map.put("view", entry.getValue());
-                       dimensionOut.emit(map);
-                       allcount += entry.getValue();
-               }
-               Map<String, Integer> map = new HashMap<String, Integer>();
-               map.put("total", new Integer(allcount));
-               total.emit(map);
-               collect = null;
-               collect  = new HashMap<k, Integer>();
-       }
+  @Override
+  public void endWindow()
+  {
+    outport.emit(collect);
+    long timestamp = new Date().getTime();
+    int allcount = 0;
+    for (Map.Entry<k, Integer> entry : collect.entrySet()) {
+      Map<String, Object> map = new HashMap<String, Object>();
+      map.put("timestamp", timestamp);
+      map.put("item", entry.getKey());
+      map.put("view", entry.getValue());
+      dimensionOut.emit(map);
+      allcount += entry.getValue();
+    }
+    Map<String, Integer> map = new HashMap<String, Integer>();
+    map.put("total", new Integer(allcount));
+    total.emit(map);
+    collect = null;
+    collect  = new HashMap<k, Integer>();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java 
b/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java
index cc7a70b..e6a85c9 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java
@@ -43,12 +43,11 @@ public class CountTestSink<T> extends CollectorTestSink<T>
   }
 
   /**
-   *
    * @param payload
    */
   @Override
   public void put(T payload)
   {
-      count++;
+    count++;
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java 
b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java
index db02f22..547340e 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java
@@ -18,16 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * An implementation of BaseOperator that creates a load with pair of keys by 
taking in an input stream event and adding to incoming keys
  * to create a new tuple that is emitted on output port data.
@@ -72,51 +72,47 @@ public class EventClassifier extends BaseOperator
     @Override
     public void process(HashMap<String, Double> tuple)
     {
-    for (Map.Entry<String, Double> e: tuple.entrySet()) {
-      String inkey = e.getKey();
-      ArrayList<Integer> alist = null;
-      if (inkeys != null) {
-        alist = inkeys.get(e.getKey());
-      }
-      if (alist == null) {
-        alist = noweight;
-      }
-
-      // now alist are the weights
-      int rval = random.nextInt(alist.get(alist.size() - 1));
-      int j = 0;
-      int wval = 0;
-      for (Integer ew: alist) {
-        wval += ew.intValue();
-        if (wval >= rval) {
-          break;
-        }
-        j++;
-      }
-      HashMap<String, Double> otuple = new HashMap<String, Double>(1);
-      String key = wtostr_index.get(j); // the key
-      Double keyval = null;
-      if (hasvalues) {
-        if (voper == value_operation.VOPR_REPLACE) { // replace the incoming 
value
-          keyval = keys.get(key);
+      for (Map.Entry<String, Double> e : tuple.entrySet()) {
+        String inkey = e.getKey();
+        ArrayList<Integer> alist = null;
+        if (inkeys != null) {
+          alist = inkeys.get(e.getKey());
         }
-        else if (voper == value_operation.VOPR_ADD) {
-          keyval = keys.get(key) + e.getValue();
+        if (alist == null) {
+          alist = noweight;
         }
-        else if (voper == value_operation.VOPR_MULT) {
-          keyval = keys.get(key) * e.getValue();
 
+        // now alist are the weights
+        int rval = random.nextInt(alist.get(alist.size() - 1));
+        int j = 0;
+        int wval = 0;
+        for (Integer ew : alist) {
+          wval += ew.intValue();
+          if (wval >= rval) {
+            break;
+          }
+          j++;
         }
-        else if (voper == value_operation.VOPR_APPEND) { // not supported yet
-          keyval = keys.get(key);
+        HashMap<String, Double> otuple = new HashMap<String, Double>(1);
+        String key = wtostr_index.get(j); // the key
+        Double keyval = null;
+        if (hasvalues) {
+          if (voper == value_operation.VOPR_REPLACE) { // replace the incoming 
value
+            keyval = keys.get(key);
+          } else if (voper == value_operation.VOPR_ADD) {
+            keyval = keys.get(key) + e.getValue();
+          } else if (voper == value_operation.VOPR_MULT) {
+            keyval = keys.get(key) * e.getValue();
+
+          } else if (voper == value_operation.VOPR_APPEND) { // not supported 
yet
+            keyval = keys.get(key);
+          }
+        } else { // pass on the value from incoming tuple
+          keyval = e.getValue();
         }
+        otuple.put(key + "," + inkey, keyval);
+        data.emit(otuple);
       }
-      else { // pass on the value from incoming tuple
-        keyval = e.getValue();
-      }
-      otuple.put(key + "," + inkey, keyval);
-      data.emit(otuple);
-    }
     }
   };
 
@@ -124,7 +120,6 @@ public class EventClassifier extends BaseOperator
    * Output data port that emits a hashmap of &lt;string,double&gt;.
    */
   public final transient DefaultOutputPort<HashMap<String, Double>> data = new 
DefaultOutputPort<HashMap<String, Double>>();
-;
 
   HashMap<String, Double> keys = new HashMap<String, Double>();
   HashMap<Integer, String> wtostr_index = new HashMap<Integer, String>();
@@ -139,7 +134,8 @@ public class EventClassifier extends BaseOperator
   enum value_operation
   {
     VOPR_REPLACE, VOPR_ADD, VOPR_MULT, VOPR_APPEND
-  };
+  }
+
   value_operation voper = value_operation.VOPR_REPLACE;
 
 
@@ -163,7 +159,7 @@ public class EventClassifier extends BaseOperator
     voper = value_operation.VOPR_MULT;
   }
 
-   public void setKeyWeights(HashMap<String, ArrayList<Integer>> map)
+  public void setKeyWeights(HashMap<String, ArrayList<Integer>> map)
   {
     if (inkeys == null) {
       inkeys = new HashMap<String, ArrayList<Integer>>();
@@ -183,7 +179,7 @@ public class EventClassifier extends BaseOperator
     }
   }
 
-   @Override
+  @Override
   public void setup(OperatorContext context)
   {
     noweight = new ArrayList<Integer>();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java
 
b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java
index 0158f3b..66ad12e 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.HashMap;
+
 import javax.validation.constraints.NotNull;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * An implementation of BaseOperator that creates a load with pair of keys by 
taking in an input stream event and adding to incoming keys
  * to create a new tuple of Hashmap &lt;String,Double&gt; that is emitted on 
output port data.
@@ -95,7 +97,7 @@ public class EventClassifierNumberToHashDouble<K extends 
Number> extends BaseOpe
   int seed = 0;
   int seed_size = 1;
 
-  String [] keys = null;
+  String[] keys = null;
 
   /**
    * setup before dag is run (pre-runtime, and post compile time)
@@ -116,8 +118,7 @@ public class EventClassifierNumberToHashDouble<K extends 
Number> extends BaseOpe
         Integer ival = i;
         keys[i] = getKey() + ival.toString();
       }
-    }
-    else {
+    } else {
       for (int i = s_end; i <= s_start; i++) {
         Integer ival = i;
         keys[i] = getKey() + ival.toString();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java 
b/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java
index 8dcf264..6b2ed8f 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java
@@ -105,7 +105,7 @@ public class EventGenerator implements InputOperator
   HashMap<Integer, String> wtostr_index = new HashMap<Integer, String>();
   ArrayList<Integer> weights;
   int total_weight = 0;
-  private transient final Random random = new Random();
+  private final transient Random random = new Random();
   public static final int ROLLING_WINDOW_COUNT_DEFAULT = 1;
   @Min(1)
   private int rolling_window_count = ROLLING_WINDOW_COUNT_DEFAULT;
@@ -157,14 +157,12 @@ public class EventGenerator implements InputOperator
         }
         weights.add(Integer.parseInt(weightsArray[i]));
         total_weight += Integer.parseInt(weightsArray[i]);
-      }
-      else {
+      } else {
         total_weight += 1;
       }
       if ((valuesArray != null) && valuesArray.length != 0) {
         keys.put(s, new Double(Double.parseDouble(valuesArray[i])));
-      }
-      else {
+      } else {
         keys.put(s, 0.0);
       }
       wtostr_index.put(i, s);
@@ -197,8 +195,7 @@ public class EventGenerator implements InputOperator
       long average;
       if (rolling_window_count == 1) {
         average = (tcount * 1000) / elapsedTime;
-      }
-      else { // use tuple_numbers
+      } else { // use tuple_numbers
         int slots;
         if (count_denominator == rolling_window_count) {
           tuple_numbers[tuple_index] = tcount;
@@ -208,8 +205,7 @@ public class EventGenerator implements InputOperator
           if (tuple_index == rolling_window_count) {
             tuple_index = 0;
           }
-        }
-        else {
+        } else {
           tuple_numbers[count_denominator - 1] = tcount;
           time_numbers[count_denominator - 1] = elapsedTime;
           slots = count_denominator;
@@ -259,7 +255,7 @@ public class EventGenerator implements InputOperator
 
   /**
    * Comma separated strings which can be used as keys
-   * @param value
+   * @param keys
    */
   public void setKeysHelper(String keys)
   {
@@ -275,14 +271,13 @@ public class EventGenerator implements InputOperator
 
   /**
    * Comma separated values which are used as weight for the same indexed keys.
-   * @param value
+   * @param weight
    */
   public void setWeightsHelper(String weight)
   {
     if (weight.isEmpty()) {
       weightsArray = null;
-    }
-    else {
+    } else {
       weightsArray = weight.split(",");
     }
   }
@@ -300,8 +295,7 @@ public class EventGenerator implements InputOperator
   {
     if (value.isEmpty()) {
       valuesArray = null;
-    }
-    else {
+    } else {
       valuesArray = value.split(",");
     }
   }
@@ -341,8 +335,7 @@ public class EventGenerator implements InputOperator
           }
           j++;
         }
-      }
-      else {
+      } else {
         j++;
         j = j % keys.size();
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java 
b/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java
index 3668d5c..46b684d 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.lib.util.KeyValPair;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Creates a random movement by taking in a seed stream and incrementing this 
data.
  * <p>
@@ -74,8 +75,7 @@ public class EventIncrementer extends BaseOperator
         if (keys.length != e.getValue().size()) { // bad seed
           return;
           // emit error tuple here
-        }
-        else {
+        } else {
           ArrayList<KeyValPair<String, Double>> alist = new 
ArrayList<KeyValPair<String, Double>>(keys.length);
           int j = 0;
           for (Integer s: e.getValue()) {
@@ -189,8 +189,7 @@ public class EventIncrementer extends BaseOperator
     double range = high - low;
     if (increment > range) { // bad data, do nothing
       ret = current;
-    }
-    else {
+    } else {
       sign = sign * -1.0;
       ret += sign * increment;
       if (ret < low) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java 
b/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java
index a8ec00b..389cbb3 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java
@@ -18,16 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * Filters the tuples as per the filter (pass through percent) and emits them.
  * <p>
@@ -83,8 +83,7 @@ public class FilterClassifier<T> extends BaseOperator
         ArrayList<Integer> alist;
         if (inkeys != null) {
           alist = inkeys.get(inkey);
-        }
-        else {
+        } else {
           alist = noweight;
         }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java
 
b/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java
index 27708dc..77d3970 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java
@@ -18,16 +18,16 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * This operator takes in a stream of tuples
  * and randomly emits them based on the specified total_filter and pass_filter 
values.&nbsp;
@@ -110,8 +110,7 @@ public class FilteredEventClassifier<T> extends BaseOperator
           T keyval;
           if (hasvalues) {
             keyval = keys.get(key);
-          }
-          else { // pass on the value from incoming tuple
+          } else { // pass on the value from incoming tuple
             keyval = e.getValue();
           }
           otuple.put(key + "," + inkey, keyval);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java 
b/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java
index 63cbaf5..389ecb8 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java
@@ -18,11 +18,12 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.api.Sink;
-
 import java.util.HashMap;
+
 import org.apache.commons.lang.mutable.MutableInt;
 
+import com.datatorrent.api.Sink;
+
 /**
  * A sink implementation, which counts the number of times each tuples is 
collected and stores the results in a hash map.
  * <p></p>
@@ -54,8 +55,7 @@ public class HashTestSink<T> implements Sink<T>
   {
     int ret = -1;
     MutableInt val = map.get(key);
-    if (val != null)
-    {
+    if (val != null) {
       ret = val.intValue();
     }
     return ret;
@@ -64,13 +64,13 @@ public class HashTestSink<T> implements Sink<T>
   @Override
   public void put(T tuple)
   {
-      this.count++;
-      MutableInt val = map.get(tuple);
-      if (val == null) {
-        val = new MutableInt(0);
-        map.put(tuple, val);
-      }
-      val.increment();
+    this.count++;
+    MutableInt val = map.get(tuple);
+    if (val == null) {
+      val = new MutableInt(0);
+      map.put(tuple, val);
+    }
+    val.increment();
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java 
b/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java
index f24c6e4..08b190e 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java
@@ -18,14 +18,13 @@
  */
 package com.datatorrent.lib.testbench;
 
-
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator consumes tuples which are key value pairs of strings.&nbsp;
@@ -39,62 +38,64 @@ import com.datatorrent.api.Context.OperatorContext;
  */
 public class HttpStatusFilter extends BaseOperator
 {
-       private String filterStatus;
-       private Map<String, Integer> collect;
-       public final transient DefaultInputPort<Map<String, String>> inport = 
new DefaultInputPort<Map<String, String>>() {
+  private String filterStatus;
+  private Map<String, Integer> collect;
+  public final transient DefaultInputPort<Map<String, String>> inport = new 
DefaultInputPort<Map<String, String>>()
+  {
     @Override
-    public void process(Map<String, String> s) {
-       for(Map.Entry<String, String> entry : s.entrySet())
-       {
-               if (!entry.getValue().equals(filterStatus)) continue;
-               if (collect.containsKey(entry.getKey()))
-               {
-                       Integer value = (Integer)collect.remove(entry.getKey());
-                       collect.put(entry.getKey(), new Integer(value+1));
-               } else {
-                       collect.put(entry.getKey(), new Integer(1));
-               }
-       }
+    public void process(Map<String, String> s)
+    {
+      for (Map.Entry<String, String> entry : s.entrySet()) {
+        if (!entry.getValue().equals(filterStatus)) {
+          continue;
+        }
+        if (collect.containsKey(entry.getKey())) {
+          Integer value = (Integer)collect.remove(entry.getKey());
+          collect.put(entry.getKey(), new Integer(value + 1));
+        } else {
+          collect.put(entry.getKey(), new Integer(1));
+        }
+      }
     }
-       };
+  };
 
-       @Override
-       public void setup(OperatorContext context)
-       {
-               collect  = new HashMap<String, Integer>();
-       }
+  @Override
+  public void setup(OperatorContext context)
+  {
+    collect  = new HashMap<String, Integer>();
+  }
 
-       @Override
-       public void teardown()
-       {
-       }
+  @Override
+  public void teardown()
+  {
+  }
 
-       @Override
-       public void beginWindow(long windowId)
-       {
-               collect  = new HashMap<String, Integer>();
-       }
+  @Override
+  public void beginWindow(long windowId)
+  {
+    collect  = new HashMap<String, Integer>();
+  }
 
-       // out port
-       public final transient DefaultOutputPort<Map<String, Integer>> outport 
= new DefaultOutputPort<Map<String, Integer>>();
+  // out port
+  public final transient DefaultOutputPort<Map<String, Integer>> outport = new 
DefaultOutputPort<Map<String, Integer>>();
 
-       @Override
-       public void endWindow()
-       {
-               outport.emit(collect);
-       }
+  @Override
+  public void endWindow()
+  {
+    outport.emit(collect);
+  }
 
-       public String getFilterStatus()
-       {
-               return filterStatus;
-       }
+  public String getFilterStatus()
+  {
+    return filterStatus;
+  }
 
-       /**
-        * Only key with the following value is counted.
-        * @param filterStatus
-        */
-       public void setFilterStatus(String filterStatus)
-       {
-               this.filterStatus = filterStatus;
-       }
+  /**
+   * Only key with the following value is counted.
+   * @param filterStatus
+   */
+  public void setFilterStatus(String filterStatus)
+  {
+    this.filterStatus = filterStatus;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java 
b/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java
index 0a26961..a6004dc 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java
@@ -21,10 +21,10 @@ package com.datatorrent.lib.testbench;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator consumes maps whose keys are strings and values are 
integers.&nbsp;
@@ -37,51 +37,51 @@ import com.datatorrent.api.Context.OperatorContext;
  */
 public class KeyValSum extends BaseOperator
 {
-       private Map<String, Integer> collect;
+  private Map<String, Integer> collect;
 
   /**
    * This input port on which tuples are received.
    */
-       public final transient DefaultInputPort<Map<String, Integer>> inport = 
new DefaultInputPort<Map<String, Integer>>() {
+  public final transient DefaultInputPort<Map<String, Integer>> inport = new 
DefaultInputPort<Map<String, Integer>>()
+  {
     @Override
-    public void process(Map<String, Integer> s) {
-       for(Map.Entry<String, Integer> entry : s.entrySet())
-       {
-               if (collect.containsKey(entry.getKey()))
-               {
-                       Integer value = (Integer)collect.remove(entry.getKey());
-                       collect.put(entry.getKey(), value + entry.getValue());
-               } else {
-                       collect.put(entry.getKey(), entry.getValue());
-               }
-       }
+    public void process(Map<String, Integer> s)
+    {
+      for (Map.Entry<String, Integer> entry : s.entrySet()) {
+        if (collect.containsKey(entry.getKey())) {
+          Integer value = (Integer)collect.remove(entry.getKey());
+          collect.put(entry.getKey(), value + entry.getValue());
+        } else {
+          collect.put(entry.getKey(), entry.getValue());
+        }
+      }
     }
-       };
+  };
 
-       @Override
-       public void setup(OperatorContext context)
-       {
-       }
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
 
-       @Override
-       public void teardown()
-       {
-       }
+  @Override
+  public void teardown()
+  {
+  }
 
-       @Override
-       public void beginWindow(long windowId)
-       {
-               collect  = new HashMap<String, Integer>();
-       }
+  @Override
+  public void beginWindow(long windowId)
+  {
+    collect  = new HashMap<String, Integer>();
+  }
 
-       /**
+  /**
    * The output port on which sums are emitted.
    */
-       public final transient DefaultOutputPort<Map<String, Integer>> outport 
= new DefaultOutputPort<Map<String, Integer>>();
+  public final transient DefaultOutputPort<Map<String, Integer>> outport = new 
DefaultOutputPort<Map<String, Integer>>();
 
-       @Override
-       public void endWindow()
-       {
-               outport.emit(collect);
-       }
+  @Override
+  public void endWindow()
+  {
+    outport.emit(collect);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java 
b/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java
index f8c0b51..55b5c04 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.Random;
+
 import javax.validation.constraints.Min;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * Generates synthetic load.&nbsp;Creates tuples using random numbers and 
keeps emitting them on the output port string_data and integer_data.
  * <p>
@@ -117,7 +118,8 @@ public class RandomEventGenerator extends BaseOperator 
implements InputOperator
     tuplesBlast = i;
   }
 
-  public void setTuplesBlastIntervalMillis(int tuplesBlastIntervalMillis) {
+  public void setTuplesBlastIntervalMillis(int tuplesBlastIntervalMillis)
+  {
     this.tuplesBlastIntervalMillis = tuplesBlastIntervalMillis;
   }
 
@@ -172,6 +174,7 @@ public class RandomEventGenerator extends BaseOperator 
implements InputOperator
       try {
         Thread.sleep(tuplesBlastIntervalMillis);
       } catch (InterruptedException e) {
+        //fixme
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java 
b/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java
index e88f06f..6d73cab 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java
@@ -18,11 +18,13 @@
  */
 package com.datatorrent.lib.testbench;
 
+import java.util.Random;
+
+import javax.validation.constraints.Min;
+
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.InputOperator;
-import java.util.Random;
-import javax.validation.constraints.Min;
 
 /**
  * This is an input operator which generates random tuples that are an array 
of bytes.
@@ -87,10 +89,7 @@ public class RandomWordGenerator implements InputOperator
   @Override
   public void emitTuples()
   {
-    for(;
-        tupleCounter < tuplesPerWindow;
-        tupleCounter++)
-    {
+    for (; tupleCounter < tuplesPerWindow; tupleCounter++) {
       byte[] bytes = new byte[tupleSize];
       random.nextBytes(bytes);
       output.emit(bytes);

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java 
b/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java
index 1e03b29..8c5a24e 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java
@@ -22,10 +22,10 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator collects integer tuples, then emits their sum at the end of 
the window.
@@ -37,46 +37,50 @@ import com.datatorrent.api.Context.OperatorContext;
  */
 public class RedisSumOper extends BaseOperator
 {
-       private ArrayList<Integer> collect;
+  private ArrayList<Integer> collect;
 
   /**
    * This is the input port which receives integer tuples to be summed.
    */
-       public final transient DefaultInputPort<Integer> inport = new 
DefaultInputPort<Integer>() {
-           @Override
-           public void process(Integer s) {
-             collect.add(s);
-           }
-       };
+  public final transient DefaultInputPort<Integer> inport = new 
DefaultInputPort<Integer>()
+  {
+    @Override
+    public void process(Integer s)
+    {
+      collect.add(s);
+    }
+  };
 
-       @Override
-       public void setup(OperatorContext context)
-       {
-       }
+  @Override
+  public void setup(OperatorContext context)
+  {
+  }
 
-       @Override
-       public void teardown()
-       {
-       }
+  @Override
+  public void teardown()
+  {
+  }
 
-       @Override
-       public void beginWindow(long windowId)
-       {
-               collect  = new ArrayList<Integer>();
-       }
+  @Override
+  public void beginWindow(long windowId)
+  {
+    collect  = new ArrayList<Integer>();
+  }
 
-       /**
+  /**
    * This is the output port which emits the summed tuples.
    */
-       public final transient DefaultOutputPort<Map<Integer, Integer>> outport 
= new DefaultOutputPort<Map<Integer, Integer>>();
+  public final transient DefaultOutputPort<Map<Integer, Integer>> outport = 
new DefaultOutputPort<Map<Integer, Integer>>();
 
-       @Override
-       public void endWindow()
-       {
-               Integer sum = 0;
-               for(Integer entry : collect) sum += entry;
-               Map<Integer, Integer> tuple = new HashMap<Integer, Integer>();
-               tuple.put(1, sum);
-               outport.emit(tuple);
-       }
+  @Override
+  public void endWindow()
+  {
+    Integer sum = 0;
+    for (Integer entry : collect) {
+      sum += entry;
+    }
+    Map<Integer, Integer> tuple = new HashMap<Integer, Integer>();
+    tuple.put(1, sum);
+    outport.emit(tuple);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java 
b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java
index 20fefc8..efbdca8 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java
@@ -18,14 +18,15 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.HashMap;
+
 import javax.validation.constraints.NotNull;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * This operator receives data on two input ports (data1, and data2).&nbsp;
  * Each incoming tuple is given a seed value
@@ -129,6 +130,7 @@ public class SeedEventClassifier<T> extends BaseOperator
       seed = s_start;
     }
   }
+
   /**
    * Data for classification values
    */

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java 
b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java
index f1541a5..3d02051 100644
--- 
a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java
@@ -18,16 +18,18 @@
  */
 package com.datatorrent.lib.testbench;
 
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.lib.util.KeyValPair;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Random;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+import com.datatorrent.lib.util.KeyValPair;
+
 /**
  * Generates a one time seed load based on the range provided by the keys,
  * and adds new classification to incoming keys.&nbsp;
@@ -135,8 +137,7 @@ public class SeedEventGenerator extends BaseOperator 
implements InputOperator
       for (int i = lstart; i < lend; i++) {
         emitTuple(i);
       }
-    }
-    else {
+    } else {
       for (int i = lstart; i > lend; i--) {
         emitTuple(i);
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java 
b/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java
index c10c784..03f0c42 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java
@@ -45,7 +45,7 @@ public class SumTestSink<T> implements Sink<T>
   public void put(T payload)
   {
     if (payload instanceof Number) {
-      val += ((Number) payload).doubleValue();
+      val += ((Number)payload).doubleValue();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java 
b/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java
index c645619..72a0b1f 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java
@@ -18,18 +18,19 @@
  */
 package com.datatorrent.lib.testbench;
 
-
-import com.datatorrent.common.util.BaseOperator;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
-
 import java.util.HashMap;
 import java.util.Map;
+
 import javax.validation.constraints.Min;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.datatorrent.api.Context.OperatorContext;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
+
 /**
  * This operator expects incoming tuples to be of type HashMap&lt;String, 
Integer&gt;.&nbsp;
  * These values are throughput per window from upstream operators.&nbsp;
@@ -156,8 +157,7 @@ public class ThroughputCounter<K, V extends Number> extends 
BaseOperator
     long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as 
elapsedTime is in millis
     if (rolling_window_count == 1) {
       average = tuples_per_sec;
-    }
-    else { // use tuple_numbers
+    } else { // use tuple_numbers
       long slots;
       if (count_denominator == rolling_window_count) {
         tuple_numbers[tuple_index] = tuple_count;
@@ -167,8 +167,7 @@ public class ThroughputCounter<K, V extends Number> extends 
BaseOperator
         if (tuple_index == rolling_window_count) {
           tuple_index = 0;
         }
-      }
-      else {
+      } else {
         tuple_numbers[count_denominator - 1] = tuple_count;
         time_numbers[count_denominator - 1] = elapsedTime;
         slots = count_denominator;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java 
b/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java
index 24e742b..3f3da57 100644
--- a/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java
+++ b/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java
@@ -23,9 +23,9 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator consumes key value pairs of strings and integers.&nbsp;
@@ -38,110 +38,106 @@ import com.datatorrent.api.DefaultOutputPort;
  */
 public class TopOccurrence extends BaseOperator
 {
-       // n value
-       private int n = 5;
-       private int threshold = 5;
+  // n value
+  private int n = 5;
+  private int threshold = 5;
 
   /**
    *
    */
-       public final transient DefaultOutputPort<Map<Integer, String>> outport 
= new DefaultOutputPort<Map<Integer, String>>();
-       /**
+  public final transient DefaultOutputPort<Map<Integer, String>> outport = new 
DefaultOutputPort<>();
+  /**
    *
    */
-  public final transient DefaultOutputPort<Map<Integer, String>> gtThreshold = 
new DefaultOutputPort<Map<Integer, String>>();
+  public final transient DefaultOutputPort<Map<Integer, String>> gtThreshold = 
new DefaultOutputPort<>();
 
-       // input port
-       public final transient DefaultInputPort<Map<String, Integer>> inport =
-                        new DefaultInputPort<Map<String, Integer>>() {
+  // input port
+  public final transient DefaultInputPort<Map<String, Integer>> inport = new 
DefaultInputPort<Map<String, Integer>>()
+  {
     @Override
     public void process(Map<String, Integer> tuple)
     {
       int numOuts = 0;
-      if (tuple.size() < n)
-      {
-       for (Map.Entry<String, Integer> entry : tuple.entrySet())
-       {
-               Map<Integer, String> out = new HashMap<Integer, String>();
-               String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
-               out.put(numOuts++, value);
-               outport.emit(out);
-       }
-       while(numOuts < n)
-       {
-               Map<Integer, String> out = new HashMap<Integer, String>();
-               out.put(numOuts++, "");
-               outport.emit(out);
-       }
+      if (tuple.size() < n) {
+        for (Map.Entry<String, Integer> entry : tuple.entrySet()) {
+          Map<Integer, String> out = new HashMap<Integer, String>();
+          String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
+          out.put(numOuts++, value);
+          outport.emit(out);
+        }
+        while (numOuts < n) {
+          Map<Integer, String> out = new HashMap<Integer, String>();
+          out.put(numOuts++, "");
+          outport.emit(out);
+        }
       } else {
 
-               ArrayList<Integer> values = new ArrayList<Integer>();
-               for (Map.Entry<String, Integer> entry : tuple.entrySet())
-               {
-                 values.add(entry.getValue());
-               }
-               Collections.sort(values);
-               for (int i=values.size()-1; i >= 0; i--)
-               {
-                 for (Map.Entry<String, Integer> entry : tuple.entrySet())
-             {
-                       if (entry.getValue() == values.get(i))
-                       {
-                         Map<Integer, String> out = new HashMap<Integer, 
String>();
-                         String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
-                         out.put(numOuts++, value);
-                         outport.emit(out);
-                       }
-                       if (numOuts >= n) break;
-             }
-                 if (numOuts >= n) break;
-               }
+        ArrayList<Integer> values = new ArrayList<Integer>();
+        for (Map.Entry<String, Integer> entry : tuple.entrySet()) {
+          values.add(entry.getValue());
+        }
+        Collections.sort(values);
+
+        for (int i = values.size() - 1; i >= 0; i--) {
+          for (Map.Entry<String, Integer> entry : tuple.entrySet()) {
+            if (entry.getValue() == values.get(i)) {
+              Map<Integer, String> out = new HashMap<Integer, String>();
+              String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
+              out.put(numOuts++, value);
+              outport.emit(out);
+            }
+            if (numOuts >= n) {
+              break;
+            }
+          }
+          if (numOuts >= n) {
+            break;
+          }
+        }
       }
 
       // output greater than threshold
       numOuts = 1;
-      for (Map.Entry<String, Integer> entry : tuple.entrySet())
-      {
-       if (entry.getValue() > threshold)
-       {
-               Map<Integer, String> out = new HashMap<Integer, String>();
-           String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
-                   out.put(numOuts++, value);
-                   gtThreshold.emit(out);
-               }
+      for (Map.Entry<String, Integer> entry : tuple.entrySet()) {
+        if (entry.getValue() > threshold) {
+          Map<Integer, String> out = new HashMap<Integer, String>();
+          String value = new 
StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString();
+          out.put(numOuts++, value);
+          gtThreshold.emit(out);
+        }
       }
       Map<Integer, String> out = new HashMap<Integer, String>();
-         out.put(0,  new Integer(numOuts).toString());
-         gtThreshold.emit(out);
-     }
-       };
+      out.put(0, new Integer(numOuts).toString());
+      gtThreshold.emit(out);
+    }
+  };
 
-       public int getN()
-       {
-               return n;
-       }
+  public int getN()
+  {
+    return n;
+  }
 
-       /**
-        * Output n top values
-        * @param n
-       */
-       public void setN(int n)
-       {
-               this.n = n;
-       }
+  /**
+   * Output n top values
+   * @param n
+  */
+  public void setN(int n)
+  {
+    this.n = n;
+  }
 
-       public int getThreshold()
-       {
-               return threshold;
-       }
+  public int getThreshold()
+  {
+    return threshold;
+  }
 
-       /**
-        * Emit the tuples only if it's value is greater than the threshold.
-        * @param threshold
-       */
-       public void setThreshold(int threshold)
-       {
-               this.threshold = threshold;
-       }
+  /**
+   * Emit the tuples only if it's value is greater than the threshold.
+   * @param threshold
+  */
+  public void setThreshold(int threshold)
+  {
+    this.threshold = threshold;
+  }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java 
b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
index a40bb97..309560b 100644
--- a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java
@@ -89,7 +89,8 @@ public class TransformOperator extends BaseOperator 
implements Operator.Activati
   @OutputPortFieldAnnotation(schemaRequired = true)
   public final transient DefaultOutputPort<Object> output = new 
DefaultOutputPort<Object>()
   {
-    @Override public void setup(Context.PortContext context)
+    @Override
+    public void setup(Context.PortContext context)
     {
       outputClass = context.getValue(Context.PortContext.TUPLE_CLASS);
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java
index a9e6d29..5a4d721 100644
--- 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java
@@ -53,25 +53,28 @@ public abstract class AbstractBaseFrequentKey<K> extends 
BaseKeyOperator<K>
     }
     count.increment();
   }
+
   protected HashMap<K, MutableInt> keycount = new HashMap<K, MutableInt>();
 
   /**
    * override emitTuple to decide the port to emit to
    * @param tuple
    */
-  abstract public void emitTuple(HashMap<K,Integer> tuple);
+  public abstract void emitTuple(HashMap<K,Integer> tuple);
+
   /**
    * Overide emitList to specify the emit schema
    * @param tlist
    */
-  abstract public void emitList(ArrayList<HashMap<K, Integer>> tlist);
+  public abstract void emitList(ArrayList<HashMap<K, Integer>> tlist);
+
   /**
    * Override compareCount to decide most vs least
    * @param val1
    * @param val2
    * @return result of compareCount to be done by subclass
    */
-  abstract public boolean compareCount(int val1, int val2);
+  public abstract boolean compareCount(int val1, int val2);
 
   /**
    * Emits the result.
@@ -88,14 +91,12 @@ public abstract class AbstractBaseFrequentKey<K> extends 
BaseKeyOperator<K>
         key = e.getKey();
         kval = e.getValue().intValue();
         map.put(key, null);
-      }
-      else if (compareCount(e.getValue().intValue(), kval)) {
+      } else if (compareCount(e.getValue().intValue(), kval)) {
         key = e.getKey();
         kval = e.getValue().intValue();
         map.clear();
         map.put(key, null);
-      }
-      else if (e.getValue().intValue() == kval) {
+      } else if (e.getValue().intValue() == kval) {
         map.put(e.getKey(), null);
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java
 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java
index 0b2c360..f96b792 100644
--- 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java
@@ -18,12 +18,13 @@
  */
 package com.datatorrent.lib.util;
 
-import com.datatorrent.api.DefaultInputPort;
-
 import java.util.HashMap;
 import java.util.Map;
+
 import org.apache.commons.lang.mutable.MutableInt;
 
+import com.datatorrent.api.DefaultInputPort;
+
 /**
  * This is the base implementation of an operator, which takes key value pairs 
as inputs.&nbsp;
  * It counts the number of times each key value pair occurs.&nbsp;
@@ -104,14 +105,12 @@ public abstract class AbstractBaseFrequentKeyValueMap<K, 
V> extends BaseKeyValue
           val = v.getKey();
           kval = v.getValue().intValue();
           vmap.put(val, null);
-        }
-        else if (compareValue(v.getValue().intValue(), kval)) {
+        } else if (compareValue(v.getValue().intValue(), kval)) {
           val = v.getKey();
           kval = v.getValue().intValue();
           vmap.clear();
           vmap.put(val, null);
-        }
-        else if (v.getValue().intValue() == kval) {
+        } else if (v.getValue().intValue() == kval) {
           vmap.put(v.getKey(), null);
         }
       }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java
index b7f4d97..f7d78db 100644
--- 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java
@@ -18,11 +18,11 @@
  */
 package com.datatorrent.lib.util;
 
-import com.datatorrent.api.Context.OperatorContext;
-import com.datatorrent.lib.util.BaseKeyValueOperator;
 import javax.validation.constraints.NotNull;
 import javax.validation.constraints.Pattern;
 
+import com.datatorrent.api.Context.OperatorContext;
+
 /**
  * This is the base implementation of operators which perform 
comparisons.&nbsp;
  * A concrete operator should be created from this skeleton implementation.
@@ -60,7 +60,8 @@ public abstract class AbstractBaseMatchOperator<K,V extends 
Comparable> extends
   public enum supported_type
   {
     LTE, LT, EQ, NEQ, GT, GTE
-  };
+  }
+
   supported_type type = supported_type.EQ;
 
   /**
@@ -166,23 +167,17 @@ public abstract class AbstractBaseMatchOperator<K,V 
extends Comparable> extends
   {
     if (cmp.equals("lt")) {
       setTypeLT();
-    }
-    else if (cmp.equals("lte")) {
+    } else if (cmp.equals("lte")) {
       setTypeLTE();
-    }
-    else if (cmp.equals("eq")) {
+    } else if (cmp.equals("eq")) {
       setTypeEQ();
-    }
-    else if (cmp.equals("ne")) {
+    } else if (cmp.equals("ne")) {
       setTypeEQ();
-    }
-    else if (cmp.equals("gt")) {
+    } else if (cmp.equals("gt")) {
       setTypeGT();
-    }
-    else if (cmp.equals("gte")) {
+    } else if (cmp.equals("gte")) {
       setTypeGTE();
-    }
-    else {
+    } else {
       setTypeEQ();
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java
 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java
index ef4c9e4..47282b1 100644
--- 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java
@@ -52,13 +52,13 @@ public abstract class AbstractBaseNNonUniqueOperatorMap<K, 
V> extends AbstractBa
    * Override to decide the direction (ascending vs descending)
    * @return true if ascending, to be done by sub-class
    */
-  abstract public boolean isAscending();
+  public abstract boolean isAscending();
 
   /**
    * Override to decide which port to emit to and its schema
    * @param tuple
    */
-  abstract public void emit(HashMap<K, ArrayList<V>> tuple);
+  public abstract void emit(HashMap<K, ArrayList<V>> tuple);
 
   /**
    *
@@ -75,8 +75,7 @@ public abstract class AbstractBaseNNonUniqueOperatorMap<K, V> 
extends AbstractBa
         pqueue = new TopNSort<V>(5, n, isAscending());
         kmap.put(cloneKey(e.getKey()), pqueue);
         pqueue.offer(cloneValue(e.getValue()));
-      }
-      else {
+      } else {
         pqueue.offer(e.getValue());
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java
index 236a05c..4f6ab85 100644
--- 
a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java
+++ 
b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java
@@ -52,7 +52,7 @@ import com.datatorrent.api.StreamCodec;
  * @tags rank, key value
  * @since 0.3.2
  */
-abstract public class AbstractBaseNOperatorMap<K,V> extends 
BaseKeyValueOperator<K,V>
+public abstract class AbstractBaseNOperatorMap<K,V> extends 
BaseKeyValueOperator<K,V>
 {
   /**
    * This is the input port that receives key value pairs.
@@ -86,7 +86,7 @@ abstract public class AbstractBaseNOperatorMap<K,V> extends 
BaseKeyValueOperator
    *
    * @param tuple
    */
-  abstract public void processTuple(Map<K,V> tuple);
+  public abstract void processTuple(Map<K,V> tuple);
 
   /**
    * Sets value of N (depth)


Reply via email to