http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
 
b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
index 7c9c350..3cace73 100644
--- 
a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
+++ 
b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java
@@ -27,9 +27,10 @@ import java.util.regex.Pattern;
 
 import javax.validation.constraints.NotNull;
 
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.slf4j.LoggerFactory;
 import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.commons.lang.mutable.MutableDouble;
 
 import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
@@ -52,9 +53,10 @@ public class MultiWindowDimensionAggregation implements 
Operator
   @SuppressWarnings("unused")
   private static final Logger logger = 
LoggerFactory.getLogger(MultiWindowDimensionAggregation.class);
 
-  public enum AggregateOperation {
+  public enum AggregateOperation
+  {
     SUM, AVERAGE
-  };
+  }
 
   private int windowSize = 2;
   private int currentWindow = 0;
@@ -76,7 +78,8 @@ public class MultiWindowDimensionAggregation implements 
Operator
   /**
    * This is the input port which receives multi dimensional data.
    */
-  public final transient DefaultInputPort<Map<String, Map<String, Number>>> 
data = new DefaultInputPort<Map<String, Map<String, Number>>>() {
+  public final transient DefaultInputPort<Map<String, Map<String, Number>>> 
data = new DefaultInputPort<Map<String, Map<String, Number>>>()
+  {
     @Override
     public void process(Map<String, Map<String, Number>> tuple)
     {
@@ -169,12 +172,15 @@ public class MultiWindowDimensionAggregation implements 
Operator
   @Override
   public void setup(OperatorContext arg0)
   {
-    if (arg0 != null)
+    if (arg0 != null) {
       applicationWindowSize = 
arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT);
-    if (cacheOject == null)
+    }
+    if (cacheOject == null) {
       cacheOject = new HashMap<Integer, Map<String, Map<String, 
Number>>>(windowSize);
-    if (outputMap == null)
+    }
+    if (outputMap == null) {
       outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, 
Integer>>>();
+    }
     setUpPatternList();
 
   }
@@ -238,8 +244,9 @@ public class MultiWindowDimensionAggregation implements 
Operator
       }
     }
     currentWindowMap.clear();
-    if (patternList == null || patternList.isEmpty())
+    if (patternList == null || patternList.isEmpty()) {
       setUpPatternList();
+    }
 
   }
 
@@ -255,12 +262,13 @@ public class MultiWindowDimensionAggregation implements 
Operator
           outputData.put(e.getKey(), new 
DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey()));
         } else if (operationType == AggregateOperation.AVERAGE) {
           if (keyVal.getValue() != 0) {
-            double totalCount = ((double) (totalWindowsOccupied * 
applicationWindowSize)) / 1000;
+            double totalCount = ((double)(totalWindowsOccupied * 
applicationWindowSize)) / 1000;
             outputData.put(e.getKey(), new DimensionObject<String>(new 
MutableDouble(keyVal.getKey().doubleValue() / totalCount), 
dimensionValObj.getKey()));
           }
         }
-        if (!outputData.isEmpty())
+        if (!outputData.isEmpty()) {
           output.emit(outputData);
+        }
       }
     }
     currentWindow = (currentWindow + 1) % windowSize;

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java 
b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
index d64c634..e469b0c 100644
--- a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
+++ b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java
@@ -20,16 +20,19 @@ package com.datatorrent.lib.logs;
 
 import java.util.HashMap;
 import java.util.Map;
-import com.google.code.regexp.Pattern;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import com.google.code.regexp.Matcher;
-import com.datatorrent.common.util.BaseOperator;
+import com.google.code.regexp.Pattern;
+
+import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.Context.OperatorContext;
 import com.datatorrent.api.annotation.OperatorAnnotation;
 import com.datatorrent.api.annotation.Stateless;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator parses unstructured log data into named fields.
@@ -77,7 +80,7 @@ import org.slf4j.LoggerFactory;
  * @since 1.0.5
  */
 @Stateless
-@OperatorAnnotation(partitionable=true)
+@OperatorAnnotation(partitionable = true)
 public class RegexMatchMapOperator extends BaseOperator
 {
   /**

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java 
b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
index 7f903fc..5f09a4b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.DefaultInputPort;
-
 import java.util.Collection;
 
+import com.datatorrent.api.DefaultInputPort;
+
 /**
  * Aggregates input tuples that are collections of longs and double and emits 
result on four ports.
  * <p>
@@ -49,54 +49,52 @@ import java.util.Collection;
  * @since 0.3.3
  */
 public abstract class AbstractAggregateCalc<T extends Number> extends
-               AbstractOutput
+    AbstractOutput
 {
-       /**
-        * Input port, accepts collection of values of type 'T'.
-        */
-       public final transient DefaultInputPort<Collection<T>> input = new 
DefaultInputPort<Collection<T>>()
-       {
-               /**
-                * Aggregate calculation result is only emitted on output port 
if it is connected.
-                */
-               @Override
-               public void process(Collection<T> collection)
-               {
-                       Double dResult = null;
-                       if (doubleResult.isConnected()) {
-                               doubleResult.emit(dResult = 
aggregateDoubles(collection));
-                       }
+  /**
+   * Input port, accepts collection of values of type 'T'.
+   */
+  public final transient DefaultInputPort<Collection<T>> input = new 
DefaultInputPort<Collection<T>>()
+  {
+    /**
+     * Aggregate calculation result is only emitted on output port if it is 
connected.
+     */
+    @Override
+    public void process(Collection<T> collection)
+    {
+      Double dResult = null;
+      if (doubleResult.isConnected()) {
+        doubleResult.emit(dResult = aggregateDoubles(collection));
+      }
 
-                       if (floatResult.isConnected()) {
-                               floatResult
-                                               .emit(dResult == null ? (float) 
(aggregateDoubles(collection))
-                                                               : 
dResult.floatValue());
-                       }
+      if (floatResult.isConnected()) {
+        floatResult.emit(dResult == null ? 
(float)(aggregateDoubles(collection)) : dResult.floatValue());
+      }
 
-                       Long lResult = null;
-                       if (longResult.isConnected()) {
-                               longResult.emit(lResult = 
aggregateLongs(collection));
-                       }
+      Long lResult = null;
+      if (longResult.isConnected()) {
+        longResult.emit(lResult = aggregateLongs(collection));
+      }
 
-                       if (integerResult.isConnected()) {
-                               integerResult.emit(lResult == null ? (int) 
aggregateLongs(collection)
-                                               : lResult.intValue());
-                       }
-               }
+      if (integerResult.isConnected()) {
+        integerResult.emit(lResult == null ? (int)aggregateLongs(collection)
+            : lResult.intValue());
+      }
+    }
 
-       };
+  };
 
-       /**
-        * Abstract function to be implemented by sub class, custom calculation 
on input aggregate.
-        * @param collection Aggregate of values 
-        * @return calculated value.
-        */
-       public abstract long aggregateLongs(Collection<T> collection);
+  /**
+   * Abstract function to be implemented by sub class, custom calculation on 
input aggregate.
+   * @param collection Aggregate of values
+   * @return calculated value.
+   */
+  public abstract long aggregateLongs(Collection<T> collection);
 
-       /**
-        * Abstract function to be implemented by sub class, custom calculation 
on input aggregate.
-        * @param collection Aggregate of values 
-        * @return calculated value.
-        */
-       public abstract double aggregateDoubles(Collection<T> collection);
+  /**
+   * Abstract function to be implemented by sub class, custom calculation on 
input aggregate.
+   * @param collection Aggregate of values
+   * @return calculated value.
+   */
+  public abstract double aggregateDoubles(Collection<T> collection);
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java 
b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
index bb387b4..9600021 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java
@@ -18,9 +18,9 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Abstract base operator defining optional double/float/long/integer output 
port.
@@ -34,27 +34,27 @@ import 
com.datatorrent.api.annotation.OutputPortFieldAnnotation;
  */
 public abstract class AbstractOutput extends BaseOperator
 {
-       /**
-        * Double type output.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<Double> doubleResult = new 
DefaultOutputPort<Double>();
+  /**
+   * Double type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> doubleResult = new 
DefaultOutputPort<Double>();
 
-       /**
-        * Float type output.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<Float> floatResult = new 
DefaultOutputPort<Float>();
+  /**
+   * Float type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Float> floatResult = new 
DefaultOutputPort<Float>();
 
-       /**
-        * Long type output.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<Long> longResult = new 
DefaultOutputPort<Long>();
+  /**
+   * Long type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Long> longResult = new 
DefaultOutputPort<Long>();
 
-       /**
-        * Integer type output.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<Integer> integerResult = new 
DefaultOutputPort<Integer>();
+  /**
+   * Integer type output.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Integer> integerResult = new 
DefaultOutputPort<Integer>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
 
b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
index a6b7ab2..10b6c15 100644
--- 
a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
+++ 
b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java
@@ -18,17 +18,23 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.api.Context;
-import com.datatorrent.netlet.util.DTThrowable;
-import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.validation.constraints.NotNull;
+import javax.xml.xpath.XPath;
+import javax.xml.xpath.XPathConstants;
+import javax.xml.xpath.XPathExpression;
+import javax.xml.xpath.XPathExpressionException;
+import javax.xml.xpath.XPathFactory;
+
 import org.w3c.dom.Document;
 import org.w3c.dom.Node;
 import org.w3c.dom.NodeList;
 
-import javax.validation.constraints.NotNull;
-import javax.xml.xpath.*;
-import java.util.ArrayList;
-import java.util.List;
+import com.datatorrent.api.Context;
+import com.datatorrent.lib.xml.AbstractXmlDOMOperator;
+import com.datatorrent.netlet.util.DTThrowable;
 
 /**
  * An operator that performs a cartesian product between different elements in 
a xml document.
@@ -146,7 +152,7 @@ public abstract class AbstractXmlCartesianProduct<T> 
extends AbstractXmlDOMOpera
     try {
       List<String> result = new ArrayList<String>();
       for (CartesianProduct cartesianProduct : cartesianProducts) {
-          cartesianProduct.product(document, result);
+        cartesianProduct.product(document, result);
       }
       processResult(result, tuple);
     } catch (XPathExpressionException e) {
@@ -252,8 +258,11 @@ public abstract class AbstractXmlCartesianProduct<T> 
extends AbstractXmlDOMOpera
           int balance = 1;
           int i;
           for (i = 1; (i < spec.length()) && (balance > 0); ++i) {
-            if (spec.charAt(i) == ')') balance--;
-            else if (spec.charAt(i) == '(') balance++;
+            if (spec.charAt(i) == ')') {
+              balance--;
+            } else if (spec.charAt(i) == '(') {
+              balance++;
+            }
           }
           if (i == spec.length()) {
             estr = spec.substring(1, spec.length() - 1);
@@ -358,10 +367,10 @@ public abstract class AbstractXmlCartesianProduct<T> 
extends AbstractXmlDOMOpera
             int chldEdDelIdx = productSpec.length() - 1;
             int chldSepDelIdx;
             if ((productSpec.charAt(chldStDelIdx) == '(') && 
(productSpec.charAt(chldEdDelIdx) == ')')
-                    && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
+                && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) {
               String child1Spec = productSpec.substring(chldStDelIdx + 1, 
chldSepDelIdx);
               String child2Spec = productSpec.substring(chldSepDelIdx + 1, 
chldEdDelIdx);
-              parentElement = (SimplePathElement) pathElement;
+              parentElement = (SimplePathElement)pathElement;
               childElement1 = pathElementFactory.getSpecable(child1Spec);
               childElement2 = pathElementFactory.getSpecable(child2Spec);
             }
@@ -419,7 +428,7 @@ public abstract class AbstractXmlCartesianProduct<T> 
extends AbstractXmlDOMOpera
   private List<Node> getNodes(Document document, String path) throws 
XPathExpressionException
   {
     XPathExpression pathExpr = xpath.compile(path);
-    NodeList nodeList = (NodeList) pathExpr.evaluate(document, 
XPathConstants.NODESET);
+    NodeList nodeList = (NodeList)pathExpr.evaluate(document, 
XPathConstants.NODESET);
     List<Node> nodes = new ArrayList<Node>();
     for (int i = 0; i < nodeList.getLength(); ++i) {
       nodes.add(nodeList.item(i));
@@ -459,8 +468,11 @@ public abstract class AbstractXmlCartesianProduct<T> 
extends AbstractXmlDOMOpera
     String delim = getDelim();
     boolean first = true;
     for (Node node : nodes) {
-      if (!first) sb.append(delim);
-      else first = false;
+      if (!first) {
+        sb.append(delim);
+      } else {
+        first = false;
+      }
       sb.append(getValue(node));
     }
     return sb.toString();

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
 
b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
index 03c98d3..91cc9ba 100644
--- 
a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
+++ 
b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java
@@ -39,7 +39,8 @@ public abstract class AbstractXmlKeyValueCartesianProduct<T> 
extends AbstractXml
   }
 
   @Override
-  public boolean isValueNode(Node n) {
+  public boolean isValueNode(Node n)
+  {
     return isTextContainerNode(n);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Average.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Average.java 
b/library/src/main/java/com/datatorrent/lib/math/Average.java
index d956e05..4dfdf1f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Average.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Average.java
@@ -44,77 +44,77 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
  */
 public class Average<V extends Number> extends BaseNumberValueOperator<V>
 {
-       /**
-        * Input port that takes a number.
-        */
-       public final transient DefaultInputPort<V> data = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Computes sum and count with each tuple
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       sums += tuple.doubleValue();
-                       counts++;
-               }
-       };
+  /**
+   * Input port that takes a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Computes sum and count with each tuple
+     */
+    @Override
+    public void process(V tuple)
+    {
+      sums += tuple.doubleValue();
+      counts++;
+    }
+  };
 
-       /**
-        * Output port that emits average as a number.
-        */
-       public final transient DefaultOutputPort<V> average = new 
DefaultOutputPort<V>();
+  /**
+   * Output port that emits average as a number.
+   */
+  public final transient DefaultOutputPort<V> average = new 
DefaultOutputPort<V>();
 
-       protected double sums = 0;
-       protected long counts = 0;
+  protected double sums = 0;
+  protected long counts = 0;
 
-       /**
-        * Emit average.
-        */
-       @Override
-       public void endWindow()
-       {
-               // May want to send out only if count != 0
-               if (counts != 0) {
-                       average.emit(getAverage());
-               }
-               sums = 0;
-               counts = 0;
-       }
+  /**
+   * Emit average.
+   */
+  @Override
+  public void endWindow()
+  {
+    // May want to send out only if count != 0
+    if (counts != 0) {
+      average.emit(getAverage());
+    }
+    sums = 0;
+    counts = 0;
+  }
 
-       /**
-        * Calculate average based on number type.
-        */
-       @SuppressWarnings("unchecked")
-       public V getAverage()
-       {
-               if (counts == 0) {
-                       return null;
-               }
-               V num = getValue(sums);
-               Number val;
-               switch (getType()) {
-                       case DOUBLE:
-                               val = new Double(num.doubleValue() / counts);
-                               break;
-                       case INTEGER:
-                               int icount = (int) (num.intValue() / counts);
-                               val = new Integer(icount);
-                               break;
-                       case FLOAT:
-                               val = new Float(num.floatValue() / counts);
-                               break;
-                       case LONG:
-                               val = new Long(num.longValue() / counts);
-                               break;
-                       case SHORT:
-                               short scount = (short) (num.shortValue() / 
counts);
-                               val = new Short(scount);
-                               break;
-                       default:
-                               val = new Double(num.doubleValue() / counts);
-                               break;
-               }
-               return (V) val;
-       }
+  /**
+   * Calculate average based on number type.
+   */
+  @SuppressWarnings("unchecked")
+  public V getAverage()
+  {
+    if (counts == 0) {
+      return null;
+    }
+    V num = getValue(sums);
+    Number val;
+    switch (getType()) {
+      case DOUBLE:
+        val = new Double(num.doubleValue() / counts);
+        break;
+      case INTEGER:
+        int icount = (int)(num.intValue() / counts);
+        val = new Integer(icount);
+        break;
+      case FLOAT:
+        val = new Float(num.floatValue() / counts);
+        break;
+      case LONG:
+        val = new Long(num.longValue() / counts);
+        break;
+      case SHORT:
+        short scount = (short)(num.shortValue() / counts);
+        val = new Short(scount);
+        break;
+      default:
+        val = new Double(num.doubleValue() / counts);
+        break;
+    }
+    return (V)val;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
index e9e7e40..e443780 100644
--- a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java
@@ -18,15 +18,17 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableLong;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.KeyValPair;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableLong;
 
 /**
  *
@@ -57,84 +59,87 @@ import org.apache.commons.lang.mutable.MutableLong;
  */
 public class AverageKeyVal<K> extends BaseNumberKeyValueOperator<K, Number>
 {
-       // Aggregate sum of all values seen for a key.
-       protected HashMap<K, MutableDouble> sums = new HashMap<K, 
MutableDouble>();
-       
-       // Count of number of values seen for key.
-       protected HashMap<K, MutableLong> counts = new HashMap<K, 
MutableLong>();
-       
-       /**
-        * Input port that takes a key value pair.
-        */
-       public final transient DefaultInputPort<KeyValPair<K, ? extends 
Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
-       {
-               /**
-                * Adds the values for each key, counts the number of 
occurrences of each
-                * key and computes the average.
-                */
-               @Override
-               public void process(KeyValPair<K, ? extends Number> tuple)
-               {
-                       K key = tuple.getKey();
-                       if (!doprocessKey(key)) {
-                               return;
-                       }
-                       MutableDouble val = sums.get(key);
-                       if (val == null) {
-                               val = new 
MutableDouble(tuple.getValue().doubleValue());
-                       } else {
-                               val.add(tuple.getValue().doubleValue());
-                       }
-                       sums.put(cloneKey(key), val);
+  // Aggregate sum of all values seen for a key.
+  protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>();
+
+  // Count of number of values seen for key.
+  protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>();
+
+  /**
+   * Input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> 
data = new DefaultInputPort<KeyValPair<K, ? extends Number>>()
+  {
+    /**
+     * Adds the values for each key, counts the number of occurrences of each
+     * key and computes the average.
+     */
+    @Override
+    public void process(KeyValPair<K, ? extends Number> tuple)
+    {
+      K key = tuple.getKey();
+      if (!doprocessKey(key)) {
+        return;
+      }
+      MutableDouble val = sums.get(key);
+      if (val == null) {
+        val = new MutableDouble(tuple.getValue().doubleValue());
+      } else {
+        val.add(tuple.getValue().doubleValue());
+      }
+      sums.put(cloneKey(key), val);
+
+      MutableLong count = counts.get(key);
+      if (count == null) {
+        count = new MutableLong(0);
+        counts.put(cloneKey(key), count);
+      }
+      count.increment();
+    }
+  };
+
+  /**
+   * Double average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Double>> 
doubleAverage =
+      new DefaultOutputPort<KeyValPair<K, Double>>();
 
-                       MutableLong count = counts.get(key);
-                       if (count == null) {
-                               count = new MutableLong(0);
-                               counts.put(cloneKey(key), count);
-                       }
-                       count.increment();
-               }
-       };
+  /**
+   * Integer average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage =
+      new DefaultOutputPort<KeyValPair<K, Integer>>();
 
-       /**
-        * Double average output port. 
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Double>> 
doubleAverage = new DefaultOutputPort<KeyValPair<K, Double>>();
-       
-       /**
-        * Integer average output port. 
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Integer>> 
intAverage = new DefaultOutputPort<KeyValPair<K, Integer>>();
-       
-       /**
-        * Long average output port. 
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Long>> 
longAverage = new DefaultOutputPort<KeyValPair<K, Long>>();
+  /**
+   * Long average output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage =
+      new DefaultOutputPort<KeyValPair<K, Long>>();
 
-       /**
-        * Emits average for each key in end window. Data is computed during 
process
-        * on input port Clears the internal data before return.
-        */
-       @Override
-       public void endWindow()
-       {
-               for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
-                       K key = e.getKey();
-                       double d = e.getValue().doubleValue();
-                       if (doubleAverage.isConnected()) {
-                               doubleAverage.emit(new KeyValPair<K, 
Double>(key, d / counts.get(key).doubleValue()));
-                       }
-                       if (intAverage.isConnected()) {
-                               intAverage.emit(new KeyValPair<K, Integer>(key, 
(int) d));
-                       }
-                       if (longAverage.isConnected()) {
-                               longAverage.emit(new KeyValPair<K, Long>(key, 
(long) d));
-                       }
-               }
-               sums.clear();
-               counts.clear();
-       }
+  /**
+   * Emits average for each key in end window. Data is computed during process
+   * on input port Clears the internal data before return.
+   */
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, MutableDouble> e : sums.entrySet()) {
+      K key = e.getKey();
+      double d = e.getValue().doubleValue();
+      if (doubleAverage.isConnected()) {
+        doubleAverage.emit(new KeyValPair<K, Double>(key, d / 
counts.get(key).doubleValue()));
+      }
+      if (intAverage.isConnected()) {
+        intAverage.emit(new KeyValPair<K, Integer>(key, (int)d));
+      }
+      if (longAverage.isConnected()) {
+        longAverage.emit(new KeyValPair<K, Long>(key, (long)d));
+      }
+    }
+    sums.clear();
+    counts.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Change.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Change.java 
b/library/src/main/java/com/datatorrent/lib/math/Change.java
index 628839f..57bad6b 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Change.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Change.java
@@ -61,57 +61,57 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 public class Change<V extends Number> extends BaseNumberValueOperator<V>
 {
         /**
-        * Input data port that takes a number.
-        */
-       public final transient DefaultInputPort<V> data = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Process each key, compute change or percent, and emit it.
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       if (baseValue != 0) { // Avoid divide by zero, Emit an 
error tuple?
-                               double cval = tuple.doubleValue() - baseValue;
-                               change.emit(getValue(cval));
-                               percent.emit((cval / baseValue) * 100);
-                       }
-               }
-       };
+   * Input data port that takes a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple?
+        double cval = tuple.doubleValue() - baseValue;
+        change.emit(getValue(cval));
+        percent.emit((cval / baseValue) * 100);
+      }
+    }
+  };
         
         /**
-        * Input port that takes a number&nbsp; It stores the value for base 
comparison. 
-        */
-       public final transient DefaultInputPort<V> base = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Process each key to store the value. If same key appears 
again update
-                * with latest value.
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       if (tuple.doubleValue() != 0.0) { // Avoid divide by 
zero, Emit an error
-                                                                               
                                                                                
// tuple?
-                               baseValue = tuple.doubleValue();
-                       }
-               }
-       };
-       
-       /**
-        * Output port that emits change in value compared to base value.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<V> change = new 
DefaultOutputPort<V>();
-       
-       /**
-        * Output port that emits percent change in data value compared to base 
value.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<Double> percent = new 
DefaultOutputPort<Double>();
-       
-       /**
-        * baseValue is a state full field. It is retained across windows.
-        */
-       private double baseValue = 0;
+   * Input port that takes a number&nbsp; It stores the value for base 
comparison.
+   */
+  public final transient DefaultInputPort<V> base = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key to store the value. If same key appears again update
+     * with latest value.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error
+                                        // tuple?
+        baseValue = tuple.doubleValue();
+      }
+    }
+  };
+
+  /**
+   * Output port that emits change in value compared to base value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<V> change = new 
DefaultOutputPort<V>();
+
+  /**
+   * Output port that emits percent change in data value compared to base 
value.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<Double> percent = new 
DefaultOutputPort<Double>();
+
+  /**
+   * baseValue is a state full field. It is retained across windows.
+   */
+  private double baseValue = 0;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java 
b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
index 01a040d..3c48016 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java
@@ -53,66 +53,66 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V>
 {
-       /**
-        * Input port that takes in a number.
-        */
-       public final transient DefaultInputPort<V> data = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Process each key, compute change or percent, and emit it. If 
we get 0 as
-                * tuple next will be skipped.
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       double tval = tuple.doubleValue();
-                       if (baseValue == 0) { // Avoid divide by zero, Emit an 
error tuple?
-                               baseValue = tval;
-                               return;
-                       }
-                       double change = tval - baseValue;
-                       double percent = (change / baseValue) * 100;
-                       if (percent < 0.0) {
-                               percent = 0.0 - percent;
-                       }
-                       if (percent > percentThreshold) {
-                               KeyValPair<V, Double> kv = new KeyValPair<V, 
Double>(cloneKey(tuple),
-                                               percent);
-                               alert.emit(kv);
-                       }
-                       baseValue = tval;
-               }
-       };
+  /**
+   * Input port that takes in a number.
+   */
+  public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it. If we get 0 as
+     * tuple next will be skipped.
+     */
+    @Override
+    public void process(V tuple)
+    {
+      double tval = tuple.doubleValue();
+      if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple?
+        baseValue = tval;
+        return;
+      }
+      double change = tval - baseValue;
+      double percent = (change / baseValue) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple),
+            percent);
+        alert.emit(kv);
+      }
+      baseValue = tval;
+    }
+  };
 
 
-       /**
-        * Output port which emits a key value pair.
-        */
-       public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = 
new DefaultOutputPort<KeyValPair<V, Double>>();
+  /**
+   * Output port which emits a key value pair.
+   */
+  public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new 
DefaultOutputPort<KeyValPair<V, Double>>();
 
-       /**
-        * baseValue is a state full field. It is retained across windows
-        */
-       private double baseValue = 0;
-       @Min(1)
-       private double percentThreshold = 0.0;
+  /**
+   * baseValue is a state full field. It is retained across windows
+   */
+  private double baseValue = 0;
+  @Min(1)
+  private double percentThreshold = 0.0;
 
-       /**
-        * getter function for threshold value
-        *
-        * @return threshold value
-        */
-       @Min(1)
-       public double getPercentThreshold()
-       {
-               return percentThreshold;
-       }
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
 
-       /**
-        * setter function for threshold value
-        */
-       public void setPercentThreshold(double d)
-       {
-               percentThreshold = d;
-       }
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
index 43e098f..b0d2e77 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java
@@ -52,78 +52,78 @@ import com.datatorrent.lib.util.KeyValPair;
  * @since 0.3.3
  */
 public class ChangeAlertKeyVal<K, V extends Number> extends
-               BaseNumberKeyValueOperator<K, V>
+    BaseNumberKeyValueOperator<K, V>
 {
-       /**
-        * Base map is a StateFull field. It is retained across windows
-        */
-       private HashMap<K, MutableDouble> basemap = new HashMap<K, 
MutableDouble>();
+  /**
+   * Base map is a StateFull field. It is retained across windows
+   */
+  private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>();
 
-       /**
-        * Input data port that takes a key value pair.
-        */
-       public final transient DefaultInputPort<KeyValPair<K, V>> data = new 
DefaultInputPort<KeyValPair<K, V>>()
-       {
-               /**
-                * Process each key, compute change or percent, and emit it.
-                */
-               @Override
-               public void process(KeyValPair<K, V> tuple)
-               {
-                       K key = tuple.getKey();
-                       double tval = tuple.getValue().doubleValue();
-                       MutableDouble val = basemap.get(key);
-                       if (!doprocessKey(key)) {
-                               return;
-                       }
-                       if (val == null) { // Only process keys that are in the 
basemap
-                               val = new MutableDouble(tval);
-                               basemap.put(cloneKey(key), val);
-                               return;
-                       }
-                       double change = tval - val.doubleValue();
-                       double percent = (change / val.doubleValue()) * 100;
-                       if (percent < 0.0) {
-                               percent = 0.0 - percent;
-                       }
-                       if (percent > percentThreshold) {
-                               KeyValPair<V, Double> dmap = new KeyValPair<V, 
Double>(
-                                               cloneValue(tuple.getValue()), 
percent);
-                               KeyValPair<K, KeyValPair<V, Double>> otuple = 
new KeyValPair<K, KeyValPair<V, Double>>(
-                                               cloneKey(key), dmap);
-                               alert.emit(otuple);
-                       }
-                       val.setValue(tval);
-               }
-       };
+  /**
+   * Input data port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new 
DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Process each key, compute change or percent, and emit it.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      double tval = tuple.getValue().doubleValue();
+      MutableDouble val = basemap.get(key);
+      if (!doprocessKey(key)) {
+        return;
+      }
+      if (val == null) { // Only process keys that are in the basemap
+        val = new MutableDouble(tval);
+        basemap.put(cloneKey(key), val);
+        return;
+      }
+      double change = tval - val.doubleValue();
+      double percent = (change / val.doubleValue()) * 100;
+      if (percent < 0.0) {
+        percent = 0.0 - percent;
+      }
+      if (percent > percentThreshold) {
+        KeyValPair<V, Double> dmap = new KeyValPair<V, Double>(
+            cloneValue(tuple.getValue()), percent);
+        KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, 
KeyValPair<V, Double>>(
+            cloneKey(key), dmap);
+        alert.emit(otuple);
+      }
+      val.setValue(tval);
+    }
+  };
 
-       /**
-        * Key,Percent Change output port.
-        */
-       public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, 
Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
+  /**
+   * Key,Percent Change output port.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, 
Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>();
 
-       /**
-        * Alert thresh hold percentage set by application.
-        */
-       @Min(1)
-       private double percentThreshold = 0.0;
+  /**
+   * Alert thresh hold percentage set by application.
+   */
+  @Min(1)
+  private double percentThreshold = 0.0;
 
-       /**
-        * getter function for threshold value
-        *
-        * @return threshold value
-        */
-       @Min(1)
-       public double getPercentThreshold()
-       {
-               return percentThreshold;
-       }
+  /**
+   * getter function for threshold value
+   *
+   * @return threshold value
+   */
+  @Min(1)
+  public double getPercentThreshold()
+  {
+    return percentThreshold;
+  }
 
-       /**
-        * setter function for threshold value
-        */
-       public void setPercentThreshold(double d)
-       {
-               percentThreshold = d;
-       }
+  /**
+   * setter function for threshold value
+   */
+  public void setPercentThreshold(double d)
+  {
+    percentThreshold = d;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java 
b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
index ebf16d1..e212a2d 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java
@@ -74,7 +74,7 @@ public class ChangeAlertMap<K, V extends Number> extends 
BaseNumberKeyValueOpera
           continue;
         }
         double change = e.getValue().doubleValue() - val.doubleValue();
-        double percent = (change/val.doubleValue())*100;
+        double percent = (change / val.doubleValue()) * 100;
         if (percent < 0.0) {
           percent = 0.0 - percent;
         }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
index 2e406ec..3f77052 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java
@@ -54,8 +54,7 @@ import com.datatorrent.lib.util.KeyValPair;
  * @tags change, key value
  * @since 0.3.3
  */
-public class ChangeKeyVal<K, V extends Number> extends
-  BaseNumberKeyValueOperator<K, V>
+public class ChangeKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K, V>
 {
   /**
    * basemap is a stateful field. It is retained across windows
@@ -81,8 +80,7 @@ public class ChangeKeyVal<K, V extends Number> extends
       if (bval != null) { // Only process keys that are in the basemap
         double cval = tuple.getValue().doubleValue() - bval.doubleValue();
         change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval)));
-        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval
-          .doubleValue()) * 100));
+        percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / 
bval.doubleValue()) * 100));
       }
     }
   };

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java 
b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
index 0573e3e..66bd7da 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java
@@ -18,13 +18,14 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.algo.MatchMap;
 import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
 
 /**
  * Operator compares based on the property "key", "value", and "compare".
@@ -86,13 +87,13 @@ public class CompareExceptMap<K, V extends Number> extends 
MatchMap<K, V>
   /**
    * Output port that emits a hashmap of matched tuples after comparison.
    */
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
   
   /**
    * Output port that emits a hashmap of non matching tuples after comparison.
    */
-  @OutputPortFieldAnnotation(optional=true)
+  @OutputPortFieldAnnotation(optional = true)
   public final transient DefaultOutputPort<HashMap<K, V>> except = new 
DefaultOutputPort<HashMap<K, V>>()
   {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java 
b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
index 540f756..3636207 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java
@@ -18,10 +18,11 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
 import com.datatorrent.lib.algo.MatchMap;
-import java.util.HashMap;
 
 /**
  * This operator compares tuples subclassed from Number based on the property 
"key", "value", and "cmp", and matching tuples are emitted.
@@ -78,8 +79,8 @@ import java.util.HashMap;
 @Stateless
 public class CompareMap<K, V extends Number> extends MatchMap<K,V>
 {
-    /**
-     * Output port that emits a hashmap of matching number tuples after 
comparison.
-     */
-    public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
+  /**
+   * Output port that emits a hashmap of matching number tuples after 
comparison.
+   */
+  public final transient DefaultOutputPort<HashMap<K, V>> compare = match;
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
index 64c5029..d593020 100644
--- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java
@@ -50,65 +50,65 @@ import com.datatorrent.lib.util.UnifierCountOccurKey;
 public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V>
 {
 
-       /**
-        * Key occurrence count map.
-        */
-       protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
+  /**
+   * Key occurrence count map.
+   */
+  protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>();
 
-       /**
-        * Input data port that takes key value pair.
-        */
-       public final transient DefaultInputPort<KeyValPair<K, V>> data = new 
DefaultInputPort<KeyValPair<K, V>>()
-       {
-               /**
-                * For each tuple (a key value pair): Adds the values for each 
key, Counts
-                * the number of occurrence of each key
-                */
-               @Override
-               public void process(KeyValPair<K, V> tuple)
-               {
-                       K key = tuple.getKey();
-                       MutableInt count = counts.get(key);
-                       if (count == null) {
-                               count = new MutableInt(0);
-                               counts.put(cloneKey(key), count);
-                       }
-                       count.increment();
-               }
+  /**
+   * Input data port that takes key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> data = new 
DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * For each tuple (a key value pair): Adds the values for each key, Counts
+     * the number of occurrence of each key
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      K key = tuple.getKey();
+      MutableInt count = counts.get(key);
+      if (count == null) {
+        count = new MutableInt(0);
+        counts.put(cloneKey(key), count);
+      }
+      count.increment();
+    }
 
-               @Override
-               public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-               {
-                       return getKeyValPairStreamCodec();
-               }
-       };
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
 
-       /**
-        * Key, occurrence value pair output port.
-        */
-       @OutputPortFieldAnnotation(optional = true)
-       public final transient DefaultOutputPort<KeyValPair<K, Integer>> count 
= new DefaultOutputPort<KeyValPair<K, Integer>>()
-       {
-               @Override
-               public UnifierCountOccurKey<K> getUnifier()
-               {
-                       return new UnifierCountOccurKey<K>();
-               }
-       };
+  /**
+   * Key, occurrence value pair output port.
+   */
+  @OutputPortFieldAnnotation(optional = true)
+  public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new 
DefaultOutputPort<KeyValPair<K, Integer>>()
+  {
+    @Override
+    public UnifierCountOccurKey<K> getUnifier()
+    {
+      return new UnifierCountOccurKey<K>();
+    }
+  };
 
-       /**
-        * Emits on all ports that are connected. Data is computed during 
process on
-        * input port and endWindow just emits it for each key. Clears the 
internal
-        * data if resetAtEndWindow is true.
-        */
-       @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Override
-       public void endWindow()
-       {
-               for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
-                       count.emit(new KeyValPair(e.getKey(),
-                                       new Integer(e.getValue().intValue())));
-               }
-               counts.clear();
-       }
+  /**
+   * Emits on all ports that are connected. Data is computed during process on
+   * input port and endWindow just emits it for each key. Clears the internal
+   * data if resetAtEndWindow is true.
+   */
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  @Override
+  public void endWindow()
+  {
+    for (Map.Entry<K, MutableInt> e : counts.entrySet()) {
+      count.emit(new KeyValPair(e.getKey(),
+          new Integer(e.getValue().intValue())));
+    }
+    counts.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Division.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java 
b/library/src/main/java/com/datatorrent/lib/math/Division.java
index 5bbb9a9..d05af18 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Division.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Division.java
@@ -20,10 +20,10 @@ package com.datatorrent.lib.math;
 
 import java.util.ArrayList;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.OutputPortFieldAnnotation;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator does division metric on consecutive tuples on ports.
@@ -54,9 +54,9 @@ import 
com.datatorrent.api.annotation.OutputPortFieldAnnotation;
  */
 public class Division extends BaseOperator
 {
-       /**
-        * Array to store numerator inputs during window.
-        */
+  /**
+   * Array to store numerator inputs during window.
+   */
   private ArrayList<Number> numer = new ArrayList<Number>();
   
   /**
@@ -83,7 +83,7 @@ public class Division extends BaseOperator
         if (loc > numer.size()) {
           loc = numer.size();
         }
-        emit(numer.get(loc-1), denom.get(loc-1));
+        emit(numer.get(loc - 1), denom.get(loc - 1));
         index++;
       }
     }
@@ -107,7 +107,7 @@ public class Division extends BaseOperator
         if (loc > numer.size()) {
           loc = numer.size();
         }
-        emit(numer.get(loc-1), denom.get(loc-1));
+        emit(numer.get(loc - 1), denom.get(loc - 1));
         index++;
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java 
b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
index 84b10b8..ddef880 100644
--- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java
@@ -68,35 +68,35 @@ public class ExceptMap<K, V extends Number> extends 
MatchMap<K, V>
         /**
          * Output port that emits non matching number tuples.
          */
-       public final transient DefaultOutputPort<HashMap<K, V>> except = new 
DefaultOutputPort<HashMap<K, V>>()
-       {
-               @Override
-               public Unifier<HashMap<K, V>> getUnifier()
-               {
-                       return new UnifierHashMap<K, V>();
-               }
-       };
+  public final transient DefaultOutputPort<HashMap<K, V>> except = new 
DefaultOutputPort<HashMap<K, V>>()
+  {
+    @Override
+    public Unifier<HashMap<K, V>> getUnifier()
+    {
+      return new UnifierHashMap<K, V>();
+    }
+  };
 
-       /**
-        * Does nothing. Overrides base as call super.tupleMatched() would emit 
the
-        * tuple
-        * 
-        * @param tuple
-        */
-       @Override
-       public void tupleMatched(Map<K, V> tuple)
-       {
-       }
+  /**
+   * Does nothing. Overrides base as call super.tupleMatched() would emit the
+   * tuple
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleMatched(Map<K, V> tuple)
+  {
+  }
 
-       /**
-        * Emits the tuple. Calls cloneTuple to get a copy, allowing users to 
override
-        * in case objects are mutable
-        * 
-        * @param tuple
-        */
-       @Override
-       public void tupleNotMatched(Map<K, V> tuple)
-       {
-               except.emit(cloneTuple(tuple));
-       }
+  /**
+   * Emits the tuple. Calls cloneTuple to get a copy, allowing users to 
override
+   * in case objects are mutable
+   *
+   * @param tuple
+   */
+  @Override
+  public void tupleNotMatched(Map<K, V> tuple)
+  {
+    except.emit(cloneTuple(tuple));
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java 
b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
index 0b7e036..41ce5a0 100644
--- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
+++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.common.util.Pair;
 
 /**
@@ -29,7 +29,7 @@ import com.datatorrent.common.util.Pair;
  * <p>
  * If the first value is equal to second value, then the pair is emitted on 
equalTo, greaterThanEqualTo, and lessThanEqualTo ports.
  * If the first value is less than second value, then the pair is emitted on 
notEqualTo, lessThan and lessThanEqualTo ports.
- * If the first value is greater than second value, then the pair is emitted 
on notEqualTo, greaterThan and greaterThanEqualTo ports. 
+ * If the first value is greater than second value, then the pair is emitted 
on notEqualTo, greaterThan and greaterThanEqualTo ports.
  * This is a pass through operator.
  * <br>
  * StateFull : No, output is computed during current window. <br>
@@ -51,61 +51,61 @@ import com.datatorrent.common.util.Pair;
  */
 @Stateless
 public abstract class LogicalCompare<T extends Comparable<? super T>> extends
-               BaseOperator
+    BaseOperator
 {
-       /**
-        * Input port that takes a key, value pair for comparison.
-        */
-       public final transient DefaultInputPort<Pair<T, T>> input = new 
DefaultInputPort<Pair<T, T>>()
-       {
-               @Override
-               public void process(Pair<T, T> tuple)
-               {
-                       int i = tuple.first.compareTo(tuple.second);
-                       if (i > 0) {
-                               greaterThan.emit(tuple);
-                               greaterThanOrEqualTo.emit(tuple);
-                               notEqualTo.emit(tuple);
-                       } else if (i < 0) {
-                               lessThan.emit(tuple);
-                               lessThanOrEqualTo.emit(tuple);
-                               notEqualTo.emit(tuple);
-                       } else {
-                               equalTo.emit(tuple);
-                               lessThanOrEqualTo.emit(tuple);
-                               greaterThanOrEqualTo.emit(tuple);
-                       }
-               }
+  /**
+   * Input port that takes a key, value pair for comparison.
+   */
+  public final transient DefaultInputPort<Pair<T, T>> input = new 
DefaultInputPort<Pair<T, T>>()
+  {
+    @Override
+    public void process(Pair<T, T> tuple)
+    {
+      int i = tuple.first.compareTo(tuple.second);
+      if (i > 0) {
+        greaterThan.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else if (i < 0) {
+        lessThan.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else {
+        equalTo.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+      }
+    }
 
-       };
+  };
 
-       /**
-        * Equal output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> equalTo = new 
DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Equal output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> equalTo = new 
DefaultOutputPort<Pair<T, T>>();
 
-       /**
-        * Not Equal output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new 
DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Not Equal output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new 
DefaultOutputPort<Pair<T, T>>();
 
-       /**
-        * Less than output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> lessThan = new 
DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Less than output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> lessThan = new 
DefaultOutputPort<Pair<T, T>>();
 
-       /**
-        * Greater than output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new 
DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Greater than output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new 
DefaultOutputPort<Pair<T, T>>();
 
-       /**
-        * Less than equal to output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo 
= new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Less than equal to output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new 
DefaultOutputPort<Pair<T, T>>();
 
-       /**
-        * Greater than equal to output port.
-        */
-       public final transient DefaultOutputPort<Pair<T, T>> 
greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>();
+  /**
+   * Greater than equal to output port.
+   */
+  public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = 
new DefaultOutputPort<Pair<T, T>>();
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java 
b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
index 5e98eae..659a287 100644
--- 
a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
+++ 
b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * This operator does a logical comparison of a constant with a tuple.
@@ -54,78 +54,78 @@ import com.datatorrent.api.annotation.Stateless;
  */
 @Stateless
 public class LogicalCompareToConstant<T extends Comparable<? super T>> extends
-               BaseOperator
+    BaseOperator
 {
 
-       /**
-        * Compare constant, set by application.
-        */
-       private T constant;
+  /**
+   * Compare constant, set by application.
+   */
+  private T constant;
 
-       /**
-        * Input port that takes a comparable to compare it with a constant.
-        */
-       public final transient DefaultInputPort<T> input = new 
DefaultInputPort<T>()
-       {
-               @Override
-               public void process(T tuple)
-               {
-                       int i = constant.compareTo(tuple);
-                       if (i > 0) {
-                               greaterThan.emit(tuple);
-                               greaterThanOrEqualTo.emit(tuple);
-                               notEqualTo.emit(tuple);
-                       } else if (i < 0) {
-                               lessThan.emit(tuple);
-                               lessThanOrEqualTo.emit(tuple);
-                               notEqualTo.emit(tuple);
-                       } else {
-                               equalTo.emit(tuple);
-                               lessThanOrEqualTo.emit(tuple);
-                               greaterThanOrEqualTo.emit(tuple);
-                       }
-               }
+  /**
+   * Input port that takes a comparable to compare it with a constant.
+   */
+  public final transient DefaultInputPort<T> input = new DefaultInputPort<T>()
+  {
+    @Override
+    public void process(T tuple)
+    {
+      int i = constant.compareTo(tuple);
+      if (i > 0) {
+        greaterThan.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else if (i < 0) {
+        lessThan.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        notEqualTo.emit(tuple);
+      } else {
+        equalTo.emit(tuple);
+        lessThanOrEqualTo.emit(tuple);
+        greaterThanOrEqualTo.emit(tuple);
+      }
+    }
 
-       };
+  };
 
-       /**
-        * Equal output port.
-        */
-       public final transient DefaultOutputPort<T> equalTo = new 
DefaultOutputPort<T>();
+  /**
+   * Equal output port.
+   */
+  public final transient DefaultOutputPort<T> equalTo = new 
DefaultOutputPort<T>();
 
-       /**
-        * Not Equal output port.
-        */
-       public final transient DefaultOutputPort<T> notEqualTo = new 
DefaultOutputPort<T>();
+  /**
+   * Not Equal output port.
+   */
+  public final transient DefaultOutputPort<T> notEqualTo = new 
DefaultOutputPort<T>();
 
-       /**
-        * Less Than output port.
-        */
-       public final transient DefaultOutputPort<T> lessThan = new 
DefaultOutputPort<T>();
+  /**
+   * Less Than output port.
+   */
+  public final transient DefaultOutputPort<T> lessThan = new 
DefaultOutputPort<T>();
 
-       /**
-        * Greater than output port.
-        */
-       public final transient DefaultOutputPort<T> greaterThan = new 
DefaultOutputPort<T>();
-       public final transient DefaultOutputPort<T> lessThanOrEqualTo = new 
DefaultOutputPort<T>();
-       public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new 
DefaultOutputPort<T>();
+  /**
+   * Greater than output port.
+   */
+  public final transient DefaultOutputPort<T> greaterThan = new 
DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<T> lessThanOrEqualTo = new 
DefaultOutputPort<T>();
+  public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new 
DefaultOutputPort<T>();
 
-       /**
-        * Set constant for comparison.
-        * 
-        * @param constant
-        *          the constant to set
-        */
-       public void setConstant(T constant)
-       {
-               this.constant = constant;
-       }
+  /**
+   * Set constant for comparison.
+   *
+   * @param constant
+   *          the constant to set
+   */
+  public void setConstant(T constant)
+  {
+    this.constant = constant;
+  }
 
-       /**
-        * returns the value of constant
-        */
-       public T getConstant()
-       {
-               return constant;
-       }
+  /**
+   * returns the value of constant
+   */
+  public T getConstant()
+  {
+    return constant;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Margin.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java 
b/library/src/main/java/com/datatorrent/lib/math/Margin.java
index 1161ba8..94e15d6 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Margin.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java
@@ -50,92 +50,92 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 @OperatorAnnotation(partitionable = false)
 public class Margin<V extends Number> extends BaseNumberValueOperator<V>
 {
-       /**
-        * Sum of numerator values.
-        */
-       protected double nval = 0.0;
+  /**
+   * Sum of numerator values.
+   */
+  protected double nval = 0.0;
 
-       /**
-        * sum of denominator values.
-        */
-       protected double dval = 0.0;
+  /**
+   * sum of denominator values.
+   */
+  protected double dval = 0.0;
 
-       /**
-        * Flag to output margin as percentage.
-        */
-       protected boolean percent = false;
+  /**
+   * Flag to output margin as percentage.
+   */
+  protected boolean percent = false;
 
-       /**
-        * Numerator input port.
-        */
-       public final transient DefaultInputPort<V> numerator = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Adds to the numerator value
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       nval += tuple.doubleValue();
-               }
-       };
+  /**
+   * Numerator input port.
+   */
+  public final transient DefaultInputPort<V> numerator = new 
DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the numerator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      nval += tuple.doubleValue();
+    }
+  };
 
-       /**
-        * Denominator input port.
-        */
-       public final transient DefaultInputPort<V> denominator = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Adds to the denominator value
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       dval += tuple.doubleValue();
-               }
-       };
+  /**
+   * Denominator input port.
+   */
+  public final transient DefaultInputPort<V> denominator = new 
DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the denominator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      dval += tuple.doubleValue();
+    }
+  };
 
-       /**
-        * Output margin port.
-        */
-       public final transient DefaultOutputPort<V> margin = new 
DefaultOutputPort<V>();
+  /**
+   * Output margin port.
+   */
+  public final transient DefaultOutputPort<V> margin = new 
DefaultOutputPort<V>();
 
-       /**
-        * getter function for percent
-        * 
-        * @return percent
-        */
-       public boolean getPercent()
-       {
-               return percent;
-       }
+  /**
+   * getter function for percent
+   *
+   * @return percent
+   */
+  public boolean getPercent()
+  {
+    return percent;
+  }
 
-       /**
-        * setter function for percent
-        * 
-        * @param val
-        *          sets percent
-        */
-       public void setPercent(boolean val)
-       {
-               percent = val;
-       }
+  /**
+   * setter function for percent
+   *
+   * @param val
+   *          sets percent
+   */
+  public void setPercent(boolean val)
+  {
+    percent = val;
+  }
 
-       /**
-        * Generates tuple emits it as long as denomitor is not 0 Clears 
internal data
-        */
-       @Override
-       public void endWindow()
-       {
-               if (dval == 0) {
-                       return;
-               }
-               double val = 1 - (nval / dval);
-               if (percent) {
-                       val = val * 100;
-               }
-               margin.emit(getValue(val));
-               nval = 0.0;
-               dval = 0.0;
-       }
+  /**
+   * Generates tuple emits it as long as denomitor is not 0 Clears internal 
data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (dval == 0) {
+      return;
+    }
+    double val = 1 - (nval / dval);
+    if (percent) {
+      val = val * 100;
+    }
+    margin.emit(getValue(val));
+    nval = 0.0;
+    dval = 0.0;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
index 29d35bc..e3af508 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java
@@ -23,12 +23,11 @@ import java.util.Map;
 
 import org.apache.commons.lang.mutable.MutableDouble;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -52,135 +51,134 @@ import com.datatorrent.api.StreamCodec;
  * @tags sum, division, numeric, key value
  * @since 0.3.3
  */
-public class MarginKeyVal<K, V extends Number> extends
-               BaseNumberKeyValueOperator<K, V>
+public class MarginKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K, V>
 {
         /**
-        * Numerator input port that takes a key value pair.
-        */
-       public final transient DefaultInputPort<KeyValPair<K, V>> numerator = 
new DefaultInputPort<KeyValPair<K, V>>()
-       {
-               /**
-                * Adds tuple to the numerator hash.
-                */
-               @Override
-               public void process(KeyValPair<K, V> tuple)
-               {
-                       addTuple(tuple, numerators);
-               }
-
-               /**
-                * Set StreamCodec used for partitioning.
-                */
-               @Override
-               public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-               {
-                       return getKeyValPairStreamCodec();
-               }
-       };
+   * Numerator input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new 
DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Adds tuple to the numerator hash.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      addTuple(tuple, numerators);
+    }
+
+    /**
+     * Set StreamCodec used for partitioning.
+     */
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
 
         /**
-        * Denominator input port that takes a key value pair.
-        */
-       public final transient DefaultInputPort<KeyValPair<K, V>> denominator = 
new DefaultInputPort<KeyValPair<K, V>>()
-       {
-               /**
-                * Adds tuple to the denominator hash.
-                */
-               @Override
-               public void process(KeyValPair<K, V> tuple)
-               {
-                       addTuple(tuple, denominators);
-               }
-
-               /**
-                * Set StreamCodec used for partitioning.
-                */
-               @Override
-               public StreamCodec<KeyValPair<K, V>> getStreamCodec()
-               {
-                       return getKeyValPairStreamCodec();
-               }
-       };
-
-       /**
-        * Adds the value for each key.
-        *
-        * @param tuple
-        * @param map
-        */
-       public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
-       {
-               K key = tuple.getKey();
-               if (!doprocessKey(key) || (tuple.getValue() == null)) {
-                       return;
-               }
-               MutableDouble val = map.get(key);
-               if (val == null) {
-                       val = new MutableDouble(0.0);
-                       map.put(cloneKey(key), val);
-               }
-               val.add(tuple.getValue().doubleValue());
-       }
+   * Denominator input port that takes a key value pair.
+   */
+  public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new 
DefaultInputPort<KeyValPair<K, V>>()
+  {
+    /**
+     * Adds tuple to the denominator hash.
+     */
+    @Override
+    public void process(KeyValPair<K, V> tuple)
+    {
+      addTuple(tuple, denominators);
+    }
+
+    /**
+     * Set StreamCodec used for partitioning.
+     */
+    @Override
+    public StreamCodec<KeyValPair<K, V>> getStreamCodec()
+    {
+      return getKeyValPairStreamCodec();
+    }
+  };
+
+  /**
+   * Adds the value for each key.
+   *
+   * @param tuple
+   * @param map
+   */
+  public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map)
+  {
+    K key = tuple.getKey();
+    if (!doprocessKey(key) || (tuple.getValue() == null)) {
+      return;
+    }
+    MutableDouble val = map.get(key);
+    if (val == null) {
+      val = new MutableDouble(0.0);
+      map.put(cloneKey(key), val);
+    }
+    val.add(tuple.getValue().doubleValue());
+  }
 
         /**
-        * Output margin port that emits Key Value pairs.
-        */
-       public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new 
DefaultOutputPort<KeyValPair<K, V>>();
-
-       protected HashMap<K, MutableDouble> numerators = new HashMap<K, 
MutableDouble>();
-       protected HashMap<K, MutableDouble> denominators = new HashMap<K, 
MutableDouble>();
-       protected boolean percent = false;
-
-       /**
-        * getter function for percent
-        *
-        * @return percent
-        */
-       public boolean getPercent()
-       {
-               return percent;
-       }
-
-       /**
-        * setter function for percent
-        *
-        * @param val
-        *          sets percent
-        */
-       public void setPercent(boolean val)
-       {
-               percent = val;
-       }
-
-       /**
-        * Generates tuples for each key and emits them. Only keys that are in 
the
-        * denominator are iterated on If the key is only in the numerator, it 
gets
-        * ignored (cannot do divide by 0) Clears internal data
-        */
-       @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
-       public void endWindow()
-       {
-               Double val;
-               for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
-                       K key = e.getKey();
-                       MutableDouble nval = numerators.get(key);
-                       if (nval == null) {
-                               nval = new MutableDouble(0.0);
-                       } else {
-                               numerators.remove(key); // so that all left 
over keys can be reported
-                       }
-                       if (percent) {
-                               val = (1 - nval.doubleValue() / 
e.getValue().doubleValue()) * 100;
-                       } else {
-                               val = 1 - nval.doubleValue() / 
e.getValue().doubleValue();
-                       }
-
-                       margin.emit(new KeyValPair(key, 
getValue(val.doubleValue())));
-               }
-
-               numerators.clear();
-               denominators.clear();
-       }
+   * Output margin port that emits Key Value pairs.
+   */
+  public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new 
DefaultOutputPort<KeyValPair<K, V>>();
+
+  protected HashMap<K, MutableDouble> numerators = new HashMap<K, 
MutableDouble>();
+  protected HashMap<K, MutableDouble> denominators = new HashMap<K, 
MutableDouble>();
+  protected boolean percent = false;
+
+  /**
+   * getter function for percent
+   *
+   * @return percent
+   */
+  public boolean getPercent()
+  {
+    return percent;
+  }
+
+  /**
+   * setter function for percent
+   *
+   * @param val
+   *          sets percent
+   */
+  public void setPercent(boolean val)
+  {
+    percent = val;
+  }
+
+  /**
+   * Generates tuples for each key and emits them. Only keys that are in the
+   * denominator are iterated on If the key is only in the numerator, it gets
+   * ignored (cannot do divide by 0) Clears internal data
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Override
+  public void endWindow()
+  {
+    Double val;
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
+      K key = e.getKey();
+      MutableDouble nval = numerators.get(key);
+      if (nval == null) {
+        nval = new MutableDouble(0.0);
+      } else {
+        numerators.remove(key); // so that all left over keys can be reported
+      }
+      if (percent) {
+        val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
+      } else {
+        val = 1 - nval.doubleValue() / e.getValue().doubleValue();
+      }
+
+      margin.emit(new KeyValPair(key, getValue(val.doubleValue())));
+    }
+
+    numerators.clear();
+    denominators.clear();
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java 
b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
index 2259d85..7ef1f81 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java
@@ -18,13 +18,15 @@
  */
 package com.datatorrent.lib.math;
 
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.commons.lang.mutable.MutableDouble;
+
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
 import com.datatorrent.lib.util.UnifierHashMap;
-import java.util.HashMap;
-import java.util.Map;
-import org.apache.commons.lang.mutable.MutableDouble;
 
 
 /**
@@ -144,18 +146,16 @@ public class MarginMap<K, V extends Number> extends 
BaseNumberKeyValueOperator<K
   {
     HashMap<K, V> tuples = new HashMap<K, V>();
     Double val;
-    for (Map.Entry<K, MutableDouble> e: denominators.entrySet()) {
+    for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) {
       MutableDouble nval = numerators.get(e.getKey());
       if (nval == null) {
         nval = new MutableDouble(0.0);
-      }
-      else {
+      } else {
         numerators.remove(e.getKey()); // so that all left over keys can be 
reported
       }
       if (percent) {
         val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100;
-      }
-      else {
+      } else {
         val = 1 - nval.doubleValue() / e.getValue().doubleValue();
       }
       tuples.put(e.getKey(), getValue(val.doubleValue()));

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Max.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Max.java 
b/library/src/main/java/com/datatorrent/lib/math/Max.java
index e4171f6..8ec8b2f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Max.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Max.java
@@ -64,8 +64,7 @@ public class Max<V extends Number> extends 
BaseNumberValueOperator<V> implements
     if (!flag) {
       high = tuple;
       flag = true;
-    }
-    else if (high.doubleValue() < tuple.doubleValue()) {
+    } else if (high.doubleValue() < tuple.doubleValue()) {
       high = tuple;
     }
   }
@@ -74,7 +73,7 @@ public class Max<V extends Number> extends 
BaseNumberValueOperator<V> implements
    * Max value output port.
    */
   public final transient DefaultOutputPort<V> max = new DefaultOutputPort<V>()
-   {
+  {
     @Override
     public Unifier<V> getUnifier()
     {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
index 58a947f..95a0a08 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -68,8 +67,7 @@ public class MaxKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K
       if (val == null) {
         val = tval;
         highs.put(cloneKey(key), val);
-      }
-      else if (val.doubleValue() < tval.doubleValue()) {
+      } else if (val.doubleValue() < tval.doubleValue()) {
         highs.put(key, tval);
       }
     }
@@ -97,7 +95,7 @@ public class MaxKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K
    * Clears internal data. Node only works in windowed mode.
    */
   @SuppressWarnings({ "rawtypes", "unchecked" })
-       @Override
+  @Override
   public void endWindow()
   {
     if (!highs.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Min.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java 
b/library/src/main/java/com/datatorrent/lib/math/Min.java
index 244d990..4b3fa23 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Min.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Min.java
@@ -41,17 +41,17 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
  */
 public class Min<V extends Number> extends BaseNumberValueOperator<V> 
implements Unifier<V>
 {
-       /**
-        * Computed low value.
-        */
+  /**
+   * Computed low value.
+   */
   protected V low;
   
   // transient field
   protected boolean flag = false;
   
-        /**
-          * Input port that takes a number and compares to min and stores the 
new min.
-          */
+  /**
+   * Input port that takes a number and compares to min and stores the new min.
+   */
   public final transient DefaultInputPort<V> data = new DefaultInputPort<V>()
   {
     /**
@@ -73,8 +73,7 @@ public class Min<V extends Number> extends 
BaseNumberValueOperator<V> implements
     if (!flag) {
       low = tuple;
       flag = true;
-    }
-    else if (low.doubleValue() > tuple.doubleValue()) {
+    } else if (low.doubleValue() > tuple.doubleValue()) {
       low = tuple;
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java 
b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
index 5ea710b..2468239 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java
@@ -21,12 +21,11 @@ package com.datatorrent.lib.math;
 import java.util.HashMap;
 import java.util.Map;
 
-import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
-import com.datatorrent.lib.util.KeyValPair;
-
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.StreamCodec;
+import com.datatorrent.lib.util.BaseNumberKeyValueOperator;
+import com.datatorrent.lib.util.KeyValPair;
 
 /**
  *
@@ -48,9 +47,9 @@ import com.datatorrent.api.StreamCodec;
  */
 public class MinKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K, V>
 {
-       /**
-        * Input port which takes a key vaue pair and updates the value for 
each key if there is a new min.
-        */
+  /**
+   * Input port which takes a key vaue pair and updates the value for each key 
if there is a new min.
+   */
   public final transient DefaultInputPort<KeyValPair<K, V>> data = new 
DefaultInputPort<KeyValPair<K, V>>()
   {
     /**
@@ -67,8 +66,7 @@ public class MinKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K
       V val = mins.get(key);
       if (val == null) {
         mins.put(cloneKey(key), tval);
-      }
-      else if (val.doubleValue() > tval.doubleValue()) {
+      } else if (val.doubleValue() > tval.doubleValue()) {
         mins.put(key, tval);
       }
     }
@@ -94,7 +92,7 @@ public class MinKeyVal<K, V extends Number> extends 
BaseNumberKeyValueOperator<K
    * Clears internal data. Node only works in windowed mode.
    */
   @SuppressWarnings({ "unchecked", "rawtypes" })
-       @Override
+  @Override
   public void endWindow()
   {
     if (!mins.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java 
b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
index c1c70d3..dd56f7f 100644
--- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
+++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java
@@ -18,10 +18,10 @@
  */
 package com.datatorrent.lib.math;
 
-import com.datatorrent.common.util.BaseOperator;
 import com.datatorrent.api.DefaultInputPort;
 import com.datatorrent.api.DefaultOutputPort;
 import com.datatorrent.api.annotation.Stateless;
+import com.datatorrent.common.util.BaseOperator;
 
 /**
  * Multiplies input tuple (Number) by the value of property "multiplier" and 
emits the result on respective ports.
@@ -51,9 +51,9 @@ import com.datatorrent.api.annotation.Stateless;
 @Stateless
 public class MultiplyByConstant extends BaseOperator
 {
-       /**
-        * Input number port.
-        */
+  /**
+   * Input number port.
+   */
   public final transient DefaultInputPort<Number> input = new 
DefaultInputPort<Number>()
   {
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Quotient.java
----------------------------------------------------------------------
diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java 
b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
index d55f205..ed08e86 100644
--- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java
+++ b/library/src/main/java/com/datatorrent/lib/math/Quotient.java
@@ -47,63 +47,63 @@ import com.datatorrent.lib.util.BaseNumberValueOperator;
 @OperatorAnnotation(partitionable = false)
 public class Quotient<V extends Number> extends BaseNumberValueOperator<V>
 {
-       protected double nval = 0.0;
-       protected double dval = 0.0;
-       int mult_by = 1;
+  protected double nval = 0.0;
+  protected double dval = 0.0;
+  int mult_by = 1;
 
-       /**
-        * Numerator values input port.
-        */
-       public final transient DefaultInputPort<V> numerator = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Adds to the numerator value
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       nval += tuple.doubleValue();
-               }
-       };
+  /**
+   * Numerator values input port.
+   */
+  public final transient DefaultInputPort<V> numerator = new 
DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the numerator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      nval += tuple.doubleValue();
+    }
+  };
 
-       /**
-        * Denominator values input port.
-        */
-       public final transient DefaultInputPort<V> denominator = new 
DefaultInputPort<V>()
-       {
-               /**
-                * Adds to the denominator value
-                */
-               @Override
-               public void process(V tuple)
-               {
-                       dval += tuple.doubleValue();
-               }
-       };
+  /**
+   * Denominator values input port.
+   */
+  public final transient DefaultInputPort<V> denominator = new 
DefaultInputPort<V>()
+  {
+    /**
+     * Adds to the denominator value
+     */
+    @Override
+    public void process(V tuple)
+    {
+      dval += tuple.doubleValue();
+    }
+  };
 
-       /**
-        * Quotient output port.
-        */
-       public final transient DefaultOutputPort<V> quotient = new 
DefaultOutputPort<V>();
+  /**
+   * Quotient output port.
+   */
+  public final transient DefaultOutputPort<V> quotient = new 
DefaultOutputPort<V>();
 
-       public void setMult_by(int i)
-       {
-               mult_by = i;
-       }
+  public void setMult_by(int i)
+  {
+    mult_by = i;
+  }
 
-       /**
-        * Generates tuple emits it as long as denominator is not 0. Clears 
internal
-        * data
-        */
-       @Override
-       public void endWindow()
-       {
-               if (dval == 0) {
-                       return;
-               }
-               double val = (nval / dval) * mult_by;
-               quotient.emit(getValue(val));
-               nval = 0.0;
-               dval = 0.0;
-       }
+  /**
+   * Generates tuple emits it as long as denominator is not 0. Clears internal
+   * data
+   */
+  @Override
+  public void endWindow()
+  {
+    if (dval == 0) {
+      return;
+    }
+    double val = (nval / dval) * mult_by;
+    quotient.emit(getValue(val));
+    nval = 0.0;
+    dval = 0.0;
+  }
 }

Reply via email to