http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java index 7c9c350..3cace73 100644 --- a/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java +++ b/library/src/main/java/com/datatorrent/lib/logs/MultiWindowDimensionAggregation.java @@ -27,9 +27,10 @@ import java.util.regex.Pattern; import javax.validation.constraints.NotNull; -import org.apache.commons.lang.mutable.MutableDouble; -import org.slf4j.LoggerFactory; import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.commons.lang.mutable.MutableDouble; import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; @@ -52,9 +53,10 @@ public class MultiWindowDimensionAggregation implements Operator @SuppressWarnings("unused") private static final Logger logger = LoggerFactory.getLogger(MultiWindowDimensionAggregation.class); - public enum AggregateOperation { + public enum AggregateOperation + { SUM, AVERAGE - }; + } private int windowSize = 2; private int currentWindow = 0; @@ -76,7 +78,8 @@ public class MultiWindowDimensionAggregation implements Operator /** * This is the input port which receives multi dimensional data. */ - public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>() { + public final transient DefaultInputPort<Map<String, Map<String, Number>>> data = new DefaultInputPort<Map<String, Map<String, Number>>>() + { @Override public void process(Map<String, Map<String, Number>> tuple) { @@ -169,12 +172,15 @@ public class MultiWindowDimensionAggregation implements Operator @Override public void setup(OperatorContext arg0) { - if (arg0 != null) + if (arg0 != null) { applicationWindowSize = arg0.getValue(OperatorContext.APPLICATION_WINDOW_COUNT); - if (cacheOject == null) + } + if (cacheOject == null) { cacheOject = new HashMap<Integer, Map<String, Map<String, Number>>>(windowSize); - if (outputMap == null) + } + if (outputMap == null) { outputMap = new HashMap<String, Map<String, KeyValPair<MutableDouble, Integer>>>(); + } setUpPatternList(); } @@ -238,8 +244,9 @@ public class MultiWindowDimensionAggregation implements Operator } } currentWindowMap.clear(); - if (patternList == null || patternList.isEmpty()) + if (patternList == null || patternList.isEmpty()) { setUpPatternList(); + } } @@ -255,12 +262,13 @@ public class MultiWindowDimensionAggregation implements Operator outputData.put(e.getKey(), new DimensionObject<String>(keyVal.getKey(), dimensionValObj.getKey())); } else if (operationType == AggregateOperation.AVERAGE) { if (keyVal.getValue() != 0) { - double totalCount = ((double) (totalWindowsOccupied * applicationWindowSize)) / 1000; + double totalCount = ((double)(totalWindowsOccupied * applicationWindowSize)) / 1000; outputData.put(e.getKey(), new DimensionObject<String>(new MutableDouble(keyVal.getKey().doubleValue() / totalCount), dimensionValObj.getKey())); } } - if (!outputData.isEmpty()) + if (!outputData.isEmpty()) { output.emit(outputData); + } } } currentWindow = (currentWindow + 1) % windowSize;
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java index d64c634..e469b0c 100644 --- a/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java +++ b/library/src/main/java/com/datatorrent/lib/logs/RegexMatchMapOperator.java @@ -20,16 +20,19 @@ package com.datatorrent.lib.logs; import java.util.HashMap; import java.util.Map; -import com.google.code.regexp.Pattern; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import com.google.code.regexp.Matcher; -import com.datatorrent.common.util.BaseOperator; +import com.google.code.regexp.Pattern; + +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.annotation.OperatorAnnotation; import com.datatorrent.api.annotation.Stateless; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import com.datatorrent.common.util.BaseOperator; /** * This operator parses unstructured log data into named fields. @@ -77,7 +80,7 @@ import org.slf4j.LoggerFactory; * @since 1.0.5 */ @Stateless -@OperatorAnnotation(partitionable=true) +@OperatorAnnotation(partitionable = true) public class RegexMatchMapOperator extends BaseOperator { /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java index 7f903fc..5f09a4b 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractAggregateCalc.java @@ -18,10 +18,10 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.api.DefaultInputPort; - import java.util.Collection; +import com.datatorrent.api.DefaultInputPort; + /** * Aggregates input tuples that are collections of longs and double and emits result on four ports. * <p> @@ -49,54 +49,52 @@ import java.util.Collection; * @since 0.3.3 */ public abstract class AbstractAggregateCalc<T extends Number> extends - AbstractOutput + AbstractOutput { - /** - * Input port, accepts collection of values of type 'T'. - */ - public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>() - { - /** - * Aggregate calculation result is only emitted on output port if it is connected. - */ - @Override - public void process(Collection<T> collection) - { - Double dResult = null; - if (doubleResult.isConnected()) { - doubleResult.emit(dResult = aggregateDoubles(collection)); - } + /** + * Input port, accepts collection of values of type 'T'. + */ + public final transient DefaultInputPort<Collection<T>> input = new DefaultInputPort<Collection<T>>() + { + /** + * Aggregate calculation result is only emitted on output port if it is connected. + */ + @Override + public void process(Collection<T> collection) + { + Double dResult = null; + if (doubleResult.isConnected()) { + doubleResult.emit(dResult = aggregateDoubles(collection)); + } - if (floatResult.isConnected()) { - floatResult - .emit(dResult == null ? (float) (aggregateDoubles(collection)) - : dResult.floatValue()); - } + if (floatResult.isConnected()) { + floatResult.emit(dResult == null ? (float)(aggregateDoubles(collection)) : dResult.floatValue()); + } - Long lResult = null; - if (longResult.isConnected()) { - longResult.emit(lResult = aggregateLongs(collection)); - } + Long lResult = null; + if (longResult.isConnected()) { + longResult.emit(lResult = aggregateLongs(collection)); + } - if (integerResult.isConnected()) { - integerResult.emit(lResult == null ? (int) aggregateLongs(collection) - : lResult.intValue()); - } - } + if (integerResult.isConnected()) { + integerResult.emit(lResult == null ? (int)aggregateLongs(collection) + : lResult.intValue()); + } + } - }; + }; - /** - * Abstract function to be implemented by sub class, custom calculation on input aggregate. - * @param collection Aggregate of values - * @return calculated value. - */ - public abstract long aggregateLongs(Collection<T> collection); + /** + * Abstract function to be implemented by sub class, custom calculation on input aggregate. + * @param collection Aggregate of values + * @return calculated value. + */ + public abstract long aggregateLongs(Collection<T> collection); - /** - * Abstract function to be implemented by sub class, custom calculation on input aggregate. - * @param collection Aggregate of values - * @return calculated value. - */ - public abstract double aggregateDoubles(Collection<T> collection); + /** + * Abstract function to be implemented by sub class, custom calculation on input aggregate. + * @param collection Aggregate of values + * @return calculated value. + */ + public abstract double aggregateDoubles(Collection<T> collection); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java index bb387b4..9600021 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractOutput.java @@ -18,9 +18,9 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; /** * Abstract base operator defining optional double/float/long/integer output port. @@ -34,27 +34,27 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; */ public abstract class AbstractOutput extends BaseOperator { - /** - * Double type output. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>(); + /** + * Double type output. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Double> doubleResult = new DefaultOutputPort<Double>(); - /** - * Float type output. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>(); + /** + * Float type output. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Float> floatResult = new DefaultOutputPort<Float>(); - /** - * Long type output. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>(); + /** + * Long type output. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Long> longResult = new DefaultOutputPort<Long>(); - /** - * Integer type output. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>(); + /** + * Integer type output. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Integer> integerResult = new DefaultOutputPort<Integer>(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java index a6b7ab2..10b6c15 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlCartesianProduct.java @@ -18,17 +18,23 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.api.Context; -import com.datatorrent.netlet.util.DTThrowable; -import com.datatorrent.lib.xml.AbstractXmlDOMOperator; +import java.util.ArrayList; +import java.util.List; + +import javax.validation.constraints.NotNull; +import javax.xml.xpath.XPath; +import javax.xml.xpath.XPathConstants; +import javax.xml.xpath.XPathExpression; +import javax.xml.xpath.XPathExpressionException; +import javax.xml.xpath.XPathFactory; + import org.w3c.dom.Document; import org.w3c.dom.Node; import org.w3c.dom.NodeList; -import javax.validation.constraints.NotNull; -import javax.xml.xpath.*; -import java.util.ArrayList; -import java.util.List; +import com.datatorrent.api.Context; +import com.datatorrent.lib.xml.AbstractXmlDOMOperator; +import com.datatorrent.netlet.util.DTThrowable; /** * An operator that performs a cartesian product between different elements in a xml document. @@ -146,7 +152,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera try { List<String> result = new ArrayList<String>(); for (CartesianProduct cartesianProduct : cartesianProducts) { - cartesianProduct.product(document, result); + cartesianProduct.product(document, result); } processResult(result, tuple); } catch (XPathExpressionException e) { @@ -252,8 +258,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera int balance = 1; int i; for (i = 1; (i < spec.length()) && (balance > 0); ++i) { - if (spec.charAt(i) == ')') balance--; - else if (spec.charAt(i) == '(') balance++; + if (spec.charAt(i) == ')') { + balance--; + } else if (spec.charAt(i) == '(') { + balance++; + } } if (i == spec.length()) { estr = spec.substring(1, spec.length() - 1); @@ -358,10 +367,10 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera int chldEdDelIdx = productSpec.length() - 1; int chldSepDelIdx; if ((productSpec.charAt(chldStDelIdx) == '(') && (productSpec.charAt(chldEdDelIdx) == ')') - && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) { + && ((chldSepDelIdx = productSpec.indexOf(':')) != -1)) { String child1Spec = productSpec.substring(chldStDelIdx + 1, chldSepDelIdx); String child2Spec = productSpec.substring(chldSepDelIdx + 1, chldEdDelIdx); - parentElement = (SimplePathElement) pathElement; + parentElement = (SimplePathElement)pathElement; childElement1 = pathElementFactory.getSpecable(child1Spec); childElement2 = pathElementFactory.getSpecable(child2Spec); } @@ -419,7 +428,7 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera private List<Node> getNodes(Document document, String path) throws XPathExpressionException { XPathExpression pathExpr = xpath.compile(path); - NodeList nodeList = (NodeList) pathExpr.evaluate(document, XPathConstants.NODESET); + NodeList nodeList = (NodeList)pathExpr.evaluate(document, XPathConstants.NODESET); List<Node> nodes = new ArrayList<Node>(); for (int i = 0; i < nodeList.getLength(); ++i) { nodes.add(nodeList.item(i)); @@ -459,8 +468,11 @@ public abstract class AbstractXmlCartesianProduct<T> extends AbstractXmlDOMOpera String delim = getDelim(); boolean first = true; for (Node node : nodes) { - if (!first) sb.append(delim); - else first = false; + if (!first) { + sb.append(delim); + } else { + first = false; + } sb.append(getValue(node)); } return sb.toString(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java index 03c98d3..91cc9ba 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java +++ b/library/src/main/java/com/datatorrent/lib/math/AbstractXmlKeyValueCartesianProduct.java @@ -39,7 +39,8 @@ public abstract class AbstractXmlKeyValueCartesianProduct<T> extends AbstractXml } @Override - public boolean isValueNode(Node n) { + public boolean isValueNode(Node n) + { return isTextContainerNode(n); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Average.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Average.java b/library/src/main/java/com/datatorrent/lib/math/Average.java index d956e05..4dfdf1f 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Average.java +++ b/library/src/main/java/com/datatorrent/lib/math/Average.java @@ -44,77 +44,77 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; */ public class Average<V extends Number> extends BaseNumberValueOperator<V> { - /** - * Input port that takes a number. - */ - public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() - { - /** - * Computes sum and count with each tuple - */ - @Override - public void process(V tuple) - { - sums += tuple.doubleValue(); - counts++; - } - }; + /** + * Input port that takes a number. + */ + public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() + { + /** + * Computes sum and count with each tuple + */ + @Override + public void process(V tuple) + { + sums += tuple.doubleValue(); + counts++; + } + }; - /** - * Output port that emits average as a number. - */ - public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>(); + /** + * Output port that emits average as a number. + */ + public final transient DefaultOutputPort<V> average = new DefaultOutputPort<V>(); - protected double sums = 0; - protected long counts = 0; + protected double sums = 0; + protected long counts = 0; - /** - * Emit average. - */ - @Override - public void endWindow() - { - // May want to send out only if count != 0 - if (counts != 0) { - average.emit(getAverage()); - } - sums = 0; - counts = 0; - } + /** + * Emit average. + */ + @Override + public void endWindow() + { + // May want to send out only if count != 0 + if (counts != 0) { + average.emit(getAverage()); + } + sums = 0; + counts = 0; + } - /** - * Calculate average based on number type. - */ - @SuppressWarnings("unchecked") - public V getAverage() - { - if (counts == 0) { - return null; - } - V num = getValue(sums); - Number val; - switch (getType()) { - case DOUBLE: - val = new Double(num.doubleValue() / counts); - break; - case INTEGER: - int icount = (int) (num.intValue() / counts); - val = new Integer(icount); - break; - case FLOAT: - val = new Float(num.floatValue() / counts); - break; - case LONG: - val = new Long(num.longValue() / counts); - break; - case SHORT: - short scount = (short) (num.shortValue() / counts); - val = new Short(scount); - break; - default: - val = new Double(num.doubleValue() / counts); - break; - } - return (V) val; - } + /** + * Calculate average based on number type. + */ + @SuppressWarnings("unchecked") + public V getAverage() + { + if (counts == 0) { + return null; + } + V num = getValue(sums); + Number val; + switch (getType()) { + case DOUBLE: + val = new Double(num.doubleValue() / counts); + break; + case INTEGER: + int icount = (int)(num.intValue() / counts); + val = new Integer(icount); + break; + case FLOAT: + val = new Float(num.floatValue() / counts); + break; + case LONG: + val = new Long(num.longValue() / counts); + break; + case SHORT: + short scount = (short)(num.shortValue() / counts); + val = new Short(scount); + break; + default: + val = new Double(num.doubleValue() / counts); + break; + } + return (V)val; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java index e9e7e40..e443780 100644 --- a/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/AverageKeyVal.java @@ -18,15 +18,17 @@ */ package com.datatorrent.lib.math; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.mutable.MutableDouble; +import org.apache.commons.lang.mutable.MutableLong; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.lib.util.BaseNumberKeyValueOperator; import com.datatorrent.lib.util.KeyValPair; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.lang.mutable.MutableDouble; -import org.apache.commons.lang.mutable.MutableLong; /** * @@ -57,84 +59,87 @@ import org.apache.commons.lang.mutable.MutableLong; */ public class AverageKeyVal<K> extends BaseNumberKeyValueOperator<K, Number> { - // Aggregate sum of all values seen for a key. - protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>(); - - // Count of number of values seen for key. - protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>(); - - /** - * Input port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>() - { - /** - * Adds the values for each key, counts the number of occurrences of each - * key and computes the average. - */ - @Override - public void process(KeyValPair<K, ? extends Number> tuple) - { - K key = tuple.getKey(); - if (!doprocessKey(key)) { - return; - } - MutableDouble val = sums.get(key); - if (val == null) { - val = new MutableDouble(tuple.getValue().doubleValue()); - } else { - val.add(tuple.getValue().doubleValue()); - } - sums.put(cloneKey(key), val); + // Aggregate sum of all values seen for a key. + protected HashMap<K, MutableDouble> sums = new HashMap<K, MutableDouble>(); + + // Count of number of values seen for key. + protected HashMap<K, MutableLong> counts = new HashMap<K, MutableLong>(); + + /** + * Input port that takes a key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, ? extends Number>> data = new DefaultInputPort<KeyValPair<K, ? extends Number>>() + { + /** + * Adds the values for each key, counts the number of occurrences of each + * key and computes the average. + */ + @Override + public void process(KeyValPair<K, ? extends Number> tuple) + { + K key = tuple.getKey(); + if (!doprocessKey(key)) { + return; + } + MutableDouble val = sums.get(key); + if (val == null) { + val = new MutableDouble(tuple.getValue().doubleValue()); + } else { + val.add(tuple.getValue().doubleValue()); + } + sums.put(cloneKey(key), val); + + MutableLong count = counts.get(key); + if (count == null) { + count = new MutableLong(0); + counts.put(cloneKey(key), count); + } + count.increment(); + } + }; + + /** + * Double average output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = + new DefaultOutputPort<KeyValPair<K, Double>>(); - MutableLong count = counts.get(key); - if (count == null) { - count = new MutableLong(0); - counts.put(cloneKey(key), count); - } - count.increment(); - } - }; + /** + * Integer average output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = + new DefaultOutputPort<KeyValPair<K, Integer>>(); - /** - * Double average output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Double>> doubleAverage = new DefaultOutputPort<KeyValPair<K, Double>>(); - - /** - * Integer average output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Integer>> intAverage = new DefaultOutputPort<KeyValPair<K, Integer>>(); - - /** - * Long average output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = new DefaultOutputPort<KeyValPair<K, Long>>(); + /** + * Long average output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Long>> longAverage = + new DefaultOutputPort<KeyValPair<K, Long>>(); - /** - * Emits average for each key in end window. Data is computed during process - * on input port Clears the internal data before return. - */ - @Override - public void endWindow() - { - for (Map.Entry<K, MutableDouble> e : sums.entrySet()) { - K key = e.getKey(); - double d = e.getValue().doubleValue(); - if (doubleAverage.isConnected()) { - doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue())); - } - if (intAverage.isConnected()) { - intAverage.emit(new KeyValPair<K, Integer>(key, (int) d)); - } - if (longAverage.isConnected()) { - longAverage.emit(new KeyValPair<K, Long>(key, (long) d)); - } - } - sums.clear(); - counts.clear(); - } + /** + * Emits average for each key in end window. Data is computed during process + * on input port Clears the internal data before return. + */ + @Override + public void endWindow() + { + for (Map.Entry<K, MutableDouble> e : sums.entrySet()) { + K key = e.getKey(); + double d = e.getValue().doubleValue(); + if (doubleAverage.isConnected()) { + doubleAverage.emit(new KeyValPair<K, Double>(key, d / counts.get(key).doubleValue())); + } + if (intAverage.isConnected()) { + intAverage.emit(new KeyValPair<K, Integer>(key, (int)d)); + } + if (longAverage.isConnected()) { + longAverage.emit(new KeyValPair<K, Long>(key, (long)d)); + } + } + sums.clear(); + counts.clear(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Change.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Change.java b/library/src/main/java/com/datatorrent/lib/math/Change.java index 628839f..57bad6b 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Change.java +++ b/library/src/main/java/com/datatorrent/lib/math/Change.java @@ -61,57 +61,57 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; public class Change<V extends Number> extends BaseNumberValueOperator<V> { /** - * Input data port that takes a number. - */ - public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() - { - /** - * Process each key, compute change or percent, and emit it. - */ - @Override - public void process(V tuple) - { - if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple? - double cval = tuple.doubleValue() - baseValue; - change.emit(getValue(cval)); - percent.emit((cval / baseValue) * 100); - } - } - }; + * Input data port that takes a number. + */ + public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() + { + /** + * Process each key, compute change or percent, and emit it. + */ + @Override + public void process(V tuple) + { + if (baseValue != 0) { // Avoid divide by zero, Emit an error tuple? + double cval = tuple.doubleValue() - baseValue; + change.emit(getValue(cval)); + percent.emit((cval / baseValue) * 100); + } + } + }; /** - * Input port that takes a number It stores the value for base comparison. - */ - public final transient DefaultInputPort<V> base = new DefaultInputPort<V>() - { - /** - * Process each key to store the value. If same key appears again update - * with latest value. - */ - @Override - public void process(V tuple) - { - if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error - // tuple? - baseValue = tuple.doubleValue(); - } - } - }; - - /** - * Output port that emits change in value compared to base value. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>(); - - /** - * Output port that emits percent change in data value compared to base value. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>(); - - /** - * baseValue is a state full field. It is retained across windows. - */ - private double baseValue = 0; + * Input port that takes a number It stores the value for base comparison. + */ + public final transient DefaultInputPort<V> base = new DefaultInputPort<V>() + { + /** + * Process each key to store the value. If same key appears again update + * with latest value. + */ + @Override + public void process(V tuple) + { + if (tuple.doubleValue() != 0.0) { // Avoid divide by zero, Emit an error + // tuple? + baseValue = tuple.doubleValue(); + } + } + }; + + /** + * Output port that emits change in value compared to base value. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<V> change = new DefaultOutputPort<V>(); + + /** + * Output port that emits percent change in data value compared to base value. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<Double> percent = new DefaultOutputPort<Double>(); + + /** + * baseValue is a state full field. It is retained across windows. + */ + private double baseValue = 0; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java index 01a040d..3c48016 100644 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java +++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlert.java @@ -53,66 +53,66 @@ import com.datatorrent.lib.util.KeyValPair; */ public class ChangeAlert<V extends Number> extends BaseNumberValueOperator<V> { - /** - * Input port that takes in a number. - */ - public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() - { - /** - * Process each key, compute change or percent, and emit it. If we get 0 as - * tuple next will be skipped. - */ - @Override - public void process(V tuple) - { - double tval = tuple.doubleValue(); - if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple? - baseValue = tval; - return; - } - double change = tval - baseValue; - double percent = (change / baseValue) * 100; - if (percent < 0.0) { - percent = 0.0 - percent; - } - if (percent > percentThreshold) { - KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple), - percent); - alert.emit(kv); - } - baseValue = tval; - } - }; + /** + * Input port that takes in a number. + */ + public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() + { + /** + * Process each key, compute change or percent, and emit it. If we get 0 as + * tuple next will be skipped. + */ + @Override + public void process(V tuple) + { + double tval = tuple.doubleValue(); + if (baseValue == 0) { // Avoid divide by zero, Emit an error tuple? + baseValue = tval; + return; + } + double change = tval - baseValue; + double percent = (change / baseValue) * 100; + if (percent < 0.0) { + percent = 0.0 - percent; + } + if (percent > percentThreshold) { + KeyValPair<V, Double> kv = new KeyValPair<V, Double>(cloneKey(tuple), + percent); + alert.emit(kv); + } + baseValue = tval; + } + }; - /** - * Output port which emits a key value pair. - */ - public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>(); + /** + * Output port which emits a key value pair. + */ + public final transient DefaultOutputPort<KeyValPair<V, Double>> alert = new DefaultOutputPort<KeyValPair<V, Double>>(); - /** - * baseValue is a state full field. It is retained across windows - */ - private double baseValue = 0; - @Min(1) - private double percentThreshold = 0.0; + /** + * baseValue is a state full field. It is retained across windows + */ + private double baseValue = 0; + @Min(1) + private double percentThreshold = 0.0; - /** - * getter function for threshold value - * - * @return threshold value - */ - @Min(1) - public double getPercentThreshold() - { - return percentThreshold; - } + /** + * getter function for threshold value + * + * @return threshold value + */ + @Min(1) + public double getPercentThreshold() + { + return percentThreshold; + } - /** - * setter function for threshold value - */ - public void setPercentThreshold(double d) - { - percentThreshold = d; - } + /** + * setter function for threshold value + */ + public void setPercentThreshold(double d) + { + percentThreshold = d; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java index 43e098f..b0d2e77 100644 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertKeyVal.java @@ -52,78 +52,78 @@ import com.datatorrent.lib.util.KeyValPair; * @since 0.3.3 */ public class ChangeAlertKeyVal<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> + BaseNumberKeyValueOperator<K, V> { - /** - * Base map is a StateFull field. It is retained across windows - */ - private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>(); + /** + * Base map is a StateFull field. It is retained across windows + */ + private HashMap<K, MutableDouble> basemap = new HashMap<K, MutableDouble>(); - /** - * Input data port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Process each key, compute change or percent, and emit it. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - double tval = tuple.getValue().doubleValue(); - MutableDouble val = basemap.get(key); - if (!doprocessKey(key)) { - return; - } - if (val == null) { // Only process keys that are in the basemap - val = new MutableDouble(tval); - basemap.put(cloneKey(key), val); - return; - } - double change = tval - val.doubleValue(); - double percent = (change / val.doubleValue()) * 100; - if (percent < 0.0) { - percent = 0.0 - percent; - } - if (percent > percentThreshold) { - KeyValPair<V, Double> dmap = new KeyValPair<V, Double>( - cloneValue(tuple.getValue()), percent); - KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>( - cloneKey(key), dmap); - alert.emit(otuple); - } - val.setValue(tval); - } - }; + /** + * Input data port that takes a key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() + { + /** + * Process each key, compute change or percent, and emit it. + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + K key = tuple.getKey(); + double tval = tuple.getValue().doubleValue(); + MutableDouble val = basemap.get(key); + if (!doprocessKey(key)) { + return; + } + if (val == null) { // Only process keys that are in the basemap + val = new MutableDouble(tval); + basemap.put(cloneKey(key), val); + return; + } + double change = tval - val.doubleValue(); + double percent = (change / val.doubleValue()) * 100; + if (percent < 0.0) { + percent = 0.0 - percent; + } + if (percent > percentThreshold) { + KeyValPair<V, Double> dmap = new KeyValPair<V, Double>( + cloneValue(tuple.getValue()), percent); + KeyValPair<K, KeyValPair<V, Double>> otuple = new KeyValPair<K, KeyValPair<V, Double>>( + cloneKey(key), dmap); + alert.emit(otuple); + } + val.setValue(tval); + } + }; - /** - * Key,Percent Change output port. - */ - public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>(); + /** + * Key,Percent Change output port. + */ + public final transient DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>> alert = new DefaultOutputPort<KeyValPair<K, KeyValPair<V, Double>>>(); - /** - * Alert thresh hold percentage set by application. - */ - @Min(1) - private double percentThreshold = 0.0; + /** + * Alert thresh hold percentage set by application. + */ + @Min(1) + private double percentThreshold = 0.0; - /** - * getter function for threshold value - * - * @return threshold value - */ - @Min(1) - public double getPercentThreshold() - { - return percentThreshold; - } + /** + * getter function for threshold value + * + * @return threshold value + */ + @Min(1) + public double getPercentThreshold() + { + return percentThreshold; + } - /** - * setter function for threshold value - */ - public void setPercentThreshold(double d) - { - percentThreshold = d; - } + /** + * setter function for threshold value + */ + public void setPercentThreshold(double d) + { + percentThreshold = d; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java index ebf16d1..e212a2d 100644 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/ChangeAlertMap.java @@ -74,7 +74,7 @@ public class ChangeAlertMap<K, V extends Number> extends BaseNumberKeyValueOpera continue; } double change = e.getValue().doubleValue() - val.doubleValue(); - double percent = (change/val.doubleValue())*100; + double percent = (change / val.doubleValue()) * 100; if (percent < 0.0) { percent = 0.0 - percent; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java index 2e406ec..3f77052 100644 --- a/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/ChangeKeyVal.java @@ -54,8 +54,7 @@ import com.datatorrent.lib.util.KeyValPair; * @tags change, key value * @since 0.3.3 */ -public class ChangeKeyVal<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> +public class ChangeKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> { /** * basemap is a stateful field. It is retained across windows @@ -81,8 +80,7 @@ public class ChangeKeyVal<K, V extends Number> extends if (bval != null) { // Only process keys that are in the basemap double cval = tuple.getValue().doubleValue() - bval.doubleValue(); change.emit(new KeyValPair<K, V>(cloneKey(key), getValue(cval))); - percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval - .doubleValue()) * 100)); + percent.emit(new KeyValPair<K, Double>(cloneKey(key), (cval / bval.doubleValue()) * 100)); } } }; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java index 0573e3e..66bd7da 100644 --- a/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/CompareExceptMap.java @@ -18,13 +18,14 @@ */ package com.datatorrent.lib.math; +import java.util.HashMap; +import java.util.Map; + import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.algo.MatchMap; import com.datatorrent.lib.util.UnifierHashMap; -import java.util.HashMap; -import java.util.Map; /** * Operator compares based on the property "key", "value", and "compare". @@ -86,13 +87,13 @@ public class CompareExceptMap<K, V extends Number> extends MatchMap<K, V> /** * Output port that emits a hashmap of matched tuples after comparison. */ - @OutputPortFieldAnnotation(optional=true) + @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<HashMap<K, V>> compare = match; /** * Output port that emits a hashmap of non matching tuples after comparison. */ - @OutputPortFieldAnnotation(optional=true) + @OutputPortFieldAnnotation(optional = true) public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CompareMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java index 540f756..3636207 100644 --- a/library/src/main/java/com/datatorrent/lib/math/CompareMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/CompareMap.java @@ -18,10 +18,11 @@ */ package com.datatorrent.lib.math; +import java.util.HashMap; + import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.algo.MatchMap; -import java.util.HashMap; /** * This operator compares tuples subclassed from Number based on the property "key", "value", and "cmp", and matching tuples are emitted. @@ -78,8 +79,8 @@ import java.util.HashMap; @Stateless public class CompareMap<K, V extends Number> extends MatchMap<K,V> { - /** - * Output port that emits a hashmap of matching number tuples after comparison. - */ - public final transient DefaultOutputPort<HashMap<K, V>> compare = match; + /** + * Output port that emits a hashmap of matching number tuples after comparison. + */ + public final transient DefaultOutputPort<HashMap<K, V>> compare = match; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java index 64c5029..d593020 100644 --- a/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/CountKeyVal.java @@ -50,65 +50,65 @@ import com.datatorrent.lib.util.UnifierCountOccurKey; public class CountKeyVal<K, V> extends BaseKeyValueOperator<K, V> { - /** - * Key occurrence count map. - */ - protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>(); + /** + * Key occurrence count map. + */ + protected HashMap<K, MutableInt> counts = new HashMap<K, MutableInt>(); - /** - * Input data port that takes key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * For each tuple (a key value pair): Adds the values for each key, Counts - * the number of occurrence of each key - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - MutableInt count = counts.get(key); - if (count == null) { - count = new MutableInt(0); - counts.put(cloneKey(key), count); - } - count.increment(); - } + /** + * Input data port that takes key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() + { + /** + * For each tuple (a key value pair): Adds the values for each key, Counts + * the number of occurrence of each key + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + K key = tuple.getKey(); + MutableInt count = counts.get(key); + if (count == null) { + count = new MutableInt(0); + counts.put(cloneKey(key), count); + } + count.increment(); + } - @Override - public StreamCodec<KeyValPair<K, V>> getStreamCodec() - { - return getKeyValPairStreamCodec(); - } - }; + @Override + public StreamCodec<KeyValPair<K, V>> getStreamCodec() + { + return getKeyValPairStreamCodec(); + } + }; - /** - * Key, occurrence value pair output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>() - { - @Override - public UnifierCountOccurKey<K> getUnifier() - { - return new UnifierCountOccurKey<K>(); - } - }; + /** + * Key, occurrence value pair output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, Integer>> count = new DefaultOutputPort<KeyValPair<K, Integer>>() + { + @Override + public UnifierCountOccurKey<K> getUnifier() + { + return new UnifierCountOccurKey<K>(); + } + }; - /** - * Emits on all ports that are connected. Data is computed during process on - * input port and endWindow just emits it for each key. Clears the internal - * data if resetAtEndWindow is true. - */ - @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override - public void endWindow() - { - for (Map.Entry<K, MutableInt> e : counts.entrySet()) { - count.emit(new KeyValPair(e.getKey(), - new Integer(e.getValue().intValue()))); - } - counts.clear(); - } + /** + * Emits on all ports that are connected. Data is computed during process on + * input port and endWindow just emits it for each key. Clears the internal + * data if resetAtEndWindow is true. + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void endWindow() + { + for (Map.Entry<K, MutableInt> e : counts.entrySet()) { + count.emit(new KeyValPair(e.getKey(), + new Integer(e.getValue().intValue()))); + } + counts.clear(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Division.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Division.java b/library/src/main/java/com/datatorrent/lib/math/Division.java index 5bbb9a9..d05af18 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Division.java +++ b/library/src/main/java/com/datatorrent/lib/math/Division.java @@ -20,10 +20,10 @@ package com.datatorrent.lib.math; import java.util.ArrayList; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; +import com.datatorrent.common.util.BaseOperator; /** * This operator does division metric on consecutive tuples on ports. @@ -54,9 +54,9 @@ import com.datatorrent.api.annotation.OutputPortFieldAnnotation; */ public class Division extends BaseOperator { - /** - * Array to store numerator inputs during window. - */ + /** + * Array to store numerator inputs during window. + */ private ArrayList<Number> numer = new ArrayList<Number>(); /** @@ -83,7 +83,7 @@ public class Division extends BaseOperator if (loc > numer.size()) { loc = numer.size(); } - emit(numer.get(loc-1), denom.get(loc-1)); + emit(numer.get(loc - 1), denom.get(loc - 1)); index++; } } @@ -107,7 +107,7 @@ public class Division extends BaseOperator if (loc > numer.size()) { loc = numer.size(); } - emit(numer.get(loc-1), denom.get(loc-1)); + emit(numer.get(loc - 1), denom.get(loc - 1)); index++; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java index 84b10b8..ddef880 100644 --- a/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/ExceptMap.java @@ -68,35 +68,35 @@ public class ExceptMap<K, V extends Number> extends MatchMap<K, V> /** * Output port that emits non matching number tuples. */ - public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>() - { - @Override - public Unifier<HashMap<K, V>> getUnifier() - { - return new UnifierHashMap<K, V>(); - } - }; + public final transient DefaultOutputPort<HashMap<K, V>> except = new DefaultOutputPort<HashMap<K, V>>() + { + @Override + public Unifier<HashMap<K, V>> getUnifier() + { + return new UnifierHashMap<K, V>(); + } + }; - /** - * Does nothing. Overrides base as call super.tupleMatched() would emit the - * tuple - * - * @param tuple - */ - @Override - public void tupleMatched(Map<K, V> tuple) - { - } + /** + * Does nothing. Overrides base as call super.tupleMatched() would emit the + * tuple + * + * @param tuple + */ + @Override + public void tupleMatched(Map<K, V> tuple) + { + } - /** - * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override - * in case objects are mutable - * - * @param tuple - */ - @Override - public void tupleNotMatched(Map<K, V> tuple) - { - except.emit(cloneTuple(tuple)); - } + /** + * Emits the tuple. Calls cloneTuple to get a copy, allowing users to override + * in case objects are mutable + * + * @param tuple + */ + @Override + public void tupleNotMatched(Map<K, V> tuple) + { + except.emit(cloneTuple(tuple)); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java index 0b7e036..41ce5a0 100644 --- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java +++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompare.java @@ -18,10 +18,10 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.common.util.Pair; /** @@ -29,7 +29,7 @@ import com.datatorrent.common.util.Pair; * <p> * If the first value is equal to second value, then the pair is emitted on equalTo, greaterThanEqualTo, and lessThanEqualTo ports. * If the first value is less than second value, then the pair is emitted on notEqualTo, lessThan and lessThanEqualTo ports. - * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports. + * If the first value is greater than second value, then the pair is emitted on notEqualTo, greaterThan and greaterThanEqualTo ports. * This is a pass through operator. * <br> * StateFull : No, output is computed during current window. <br> @@ -51,61 +51,61 @@ import com.datatorrent.common.util.Pair; */ @Stateless public abstract class LogicalCompare<T extends Comparable<? super T>> extends - BaseOperator + BaseOperator { - /** - * Input port that takes a key, value pair for comparison. - */ - public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>() - { - @Override - public void process(Pair<T, T> tuple) - { - int i = tuple.first.compareTo(tuple.second); - if (i > 0) { - greaterThan.emit(tuple); - greaterThanOrEqualTo.emit(tuple); - notEqualTo.emit(tuple); - } else if (i < 0) { - lessThan.emit(tuple); - lessThanOrEqualTo.emit(tuple); - notEqualTo.emit(tuple); - } else { - equalTo.emit(tuple); - lessThanOrEqualTo.emit(tuple); - greaterThanOrEqualTo.emit(tuple); - } - } + /** + * Input port that takes a key, value pair for comparison. + */ + public final transient DefaultInputPort<Pair<T, T>> input = new DefaultInputPort<Pair<T, T>>() + { + @Override + public void process(Pair<T, T> tuple) + { + int i = tuple.first.compareTo(tuple.second); + if (i > 0) { + greaterThan.emit(tuple); + greaterThanOrEqualTo.emit(tuple); + notEqualTo.emit(tuple); + } else if (i < 0) { + lessThan.emit(tuple); + lessThanOrEqualTo.emit(tuple); + notEqualTo.emit(tuple); + } else { + equalTo.emit(tuple); + lessThanOrEqualTo.emit(tuple); + greaterThanOrEqualTo.emit(tuple); + } + } - }; + }; - /** - * Equal output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>(); + /** + * Equal output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> equalTo = new DefaultOutputPort<Pair<T, T>>(); - /** - * Not Equal output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>(); + /** + * Not Equal output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> notEqualTo = new DefaultOutputPort<Pair<T, T>>(); - /** - * Less than output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>(); + /** + * Less than output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> lessThan = new DefaultOutputPort<Pair<T, T>>(); - /** - * Greater than output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>(); + /** + * Greater than output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> greaterThan = new DefaultOutputPort<Pair<T, T>>(); - /** - * Less than equal to output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>(); + /** + * Less than equal to output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> lessThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>(); - /** - * Greater than equal to output port. - */ - public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>(); + /** + * Greater than equal to output port. + */ + public final transient DefaultOutputPort<Pair<T, T>> greaterThanOrEqualTo = new DefaultOutputPort<Pair<T, T>>(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java index 5e98eae..659a287 100644 --- a/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java +++ b/library/src/main/java/com/datatorrent/lib/math/LogicalCompareToConstant.java @@ -18,10 +18,10 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; /** * This operator does a logical comparison of a constant with a tuple. @@ -54,78 +54,78 @@ import com.datatorrent.api.annotation.Stateless; */ @Stateless public class LogicalCompareToConstant<T extends Comparable<? super T>> extends - BaseOperator + BaseOperator { - /** - * Compare constant, set by application. - */ - private T constant; + /** + * Compare constant, set by application. + */ + private T constant; - /** - * Input port that takes a comparable to compare it with a constant. - */ - public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() - { - @Override - public void process(T tuple) - { - int i = constant.compareTo(tuple); - if (i > 0) { - greaterThan.emit(tuple); - greaterThanOrEqualTo.emit(tuple); - notEqualTo.emit(tuple); - } else if (i < 0) { - lessThan.emit(tuple); - lessThanOrEqualTo.emit(tuple); - notEqualTo.emit(tuple); - } else { - equalTo.emit(tuple); - lessThanOrEqualTo.emit(tuple); - greaterThanOrEqualTo.emit(tuple); - } - } + /** + * Input port that takes a comparable to compare it with a constant. + */ + public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() + { + @Override + public void process(T tuple) + { + int i = constant.compareTo(tuple); + if (i > 0) { + greaterThan.emit(tuple); + greaterThanOrEqualTo.emit(tuple); + notEqualTo.emit(tuple); + } else if (i < 0) { + lessThan.emit(tuple); + lessThanOrEqualTo.emit(tuple); + notEqualTo.emit(tuple); + } else { + equalTo.emit(tuple); + lessThanOrEqualTo.emit(tuple); + greaterThanOrEqualTo.emit(tuple); + } + } - }; + }; - /** - * Equal output port. - */ - public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>(); + /** + * Equal output port. + */ + public final transient DefaultOutputPort<T> equalTo = new DefaultOutputPort<T>(); - /** - * Not Equal output port. - */ - public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>(); + /** + * Not Equal output port. + */ + public final transient DefaultOutputPort<T> notEqualTo = new DefaultOutputPort<T>(); - /** - * Less Than output port. - */ - public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>(); + /** + * Less Than output port. + */ + public final transient DefaultOutputPort<T> lessThan = new DefaultOutputPort<T>(); - /** - * Greater than output port. - */ - public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>(); - public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>(); - public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>(); + /** + * Greater than output port. + */ + public final transient DefaultOutputPort<T> greaterThan = new DefaultOutputPort<T>(); + public final transient DefaultOutputPort<T> lessThanOrEqualTo = new DefaultOutputPort<T>(); + public final transient DefaultOutputPort<T> greaterThanOrEqualTo = new DefaultOutputPort<T>(); - /** - * Set constant for comparison. - * - * @param constant - * the constant to set - */ - public void setConstant(T constant) - { - this.constant = constant; - } + /** + * Set constant for comparison. + * + * @param constant + * the constant to set + */ + public void setConstant(T constant) + { + this.constant = constant; + } - /** - * returns the value of constant - */ - public T getConstant() - { - return constant; - } + /** + * returns the value of constant + */ + public T getConstant() + { + return constant; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Margin.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Margin.java b/library/src/main/java/com/datatorrent/lib/math/Margin.java index 1161ba8..94e15d6 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Margin.java +++ b/library/src/main/java/com/datatorrent/lib/math/Margin.java @@ -50,92 +50,92 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; @OperatorAnnotation(partitionable = false) public class Margin<V extends Number> extends BaseNumberValueOperator<V> { - /** - * Sum of numerator values. - */ - protected double nval = 0.0; + /** + * Sum of numerator values. + */ + protected double nval = 0.0; - /** - * sum of denominator values. - */ - protected double dval = 0.0; + /** + * sum of denominator values. + */ + protected double dval = 0.0; - /** - * Flag to output margin as percentage. - */ - protected boolean percent = false; + /** + * Flag to output margin as percentage. + */ + protected boolean percent = false; - /** - * Numerator input port. - */ - public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>() - { - /** - * Adds to the numerator value - */ - @Override - public void process(V tuple) - { - nval += tuple.doubleValue(); - } - }; + /** + * Numerator input port. + */ + public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>() + { + /** + * Adds to the numerator value + */ + @Override + public void process(V tuple) + { + nval += tuple.doubleValue(); + } + }; - /** - * Denominator input port. - */ - public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>() - { - /** - * Adds to the denominator value - */ - @Override - public void process(V tuple) - { - dval += tuple.doubleValue(); - } - }; + /** + * Denominator input port. + */ + public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>() + { + /** + * Adds to the denominator value + */ + @Override + public void process(V tuple) + { + dval += tuple.doubleValue(); + } + }; - /** - * Output margin port. - */ - public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>(); + /** + * Output margin port. + */ + public final transient DefaultOutputPort<V> margin = new DefaultOutputPort<V>(); - /** - * getter function for percent - * - * @return percent - */ - public boolean getPercent() - { - return percent; - } + /** + * getter function for percent + * + * @return percent + */ + public boolean getPercent() + { + return percent; + } - /** - * setter function for percent - * - * @param val - * sets percent - */ - public void setPercent(boolean val) - { - percent = val; - } + /** + * setter function for percent + * + * @param val + * sets percent + */ + public void setPercent(boolean val) + { + percent = val; + } - /** - * Generates tuple emits it as long as denomitor is not 0 Clears internal data - */ - @Override - public void endWindow() - { - if (dval == 0) { - return; - } - double val = 1 - (nval / dval); - if (percent) { - val = val * 100; - } - margin.emit(getValue(val)); - nval = 0.0; - dval = 0.0; - } + /** + * Generates tuple emits it as long as denomitor is not 0 Clears internal data + */ + @Override + public void endWindow() + { + if (dval == 0) { + return; + } + double val = 1 - (nval / dval); + if (percent) { + val = val * 100; + } + margin.emit(getValue(val)); + nval = 0.0; + dval = 0.0; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java index 29d35bc..e3af508 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/MarginKeyVal.java @@ -23,12 +23,11 @@ import java.util.Map; import org.apache.commons.lang.mutable.MutableDouble; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.util.BaseNumberKeyValueOperator; +import com.datatorrent.lib.util.KeyValPair; /** * @@ -52,135 +51,134 @@ import com.datatorrent.api.StreamCodec; * @tags sum, division, numeric, key value * @since 0.3.3 */ -public class MarginKeyVal<K, V extends Number> extends - BaseNumberKeyValueOperator<K, V> +public class MarginKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> { /** - * Numerator input port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Adds tuple to the numerator hash. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - addTuple(tuple, numerators); - } - - /** - * Set StreamCodec used for partitioning. - */ - @Override - public StreamCodec<KeyValPair<K, V>> getStreamCodec() - { - return getKeyValPairStreamCodec(); - } - }; + * Numerator input port that takes a key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, V>> numerator = new DefaultInputPort<KeyValPair<K, V>>() + { + /** + * Adds tuple to the numerator hash. + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + addTuple(tuple, numerators); + } + + /** + * Set StreamCodec used for partitioning. + */ + @Override + public StreamCodec<KeyValPair<K, V>> getStreamCodec() + { + return getKeyValPairStreamCodec(); + } + }; /** - * Denominator input port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Adds tuple to the denominator hash. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - addTuple(tuple, denominators); - } - - /** - * Set StreamCodec used for partitioning. - */ - @Override - public StreamCodec<KeyValPair<K, V>> getStreamCodec() - { - return getKeyValPairStreamCodec(); - } - }; - - /** - * Adds the value for each key. - * - * @param tuple - * @param map - */ - public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map) - { - K key = tuple.getKey(); - if (!doprocessKey(key) || (tuple.getValue() == null)) { - return; - } - MutableDouble val = map.get(key); - if (val == null) { - val = new MutableDouble(0.0); - map.put(cloneKey(key), val); - } - val.add(tuple.getValue().doubleValue()); - } + * Denominator input port that takes a key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, V>> denominator = new DefaultInputPort<KeyValPair<K, V>>() + { + /** + * Adds tuple to the denominator hash. + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + addTuple(tuple, denominators); + } + + /** + * Set StreamCodec used for partitioning. + */ + @Override + public StreamCodec<KeyValPair<K, V>> getStreamCodec() + { + return getKeyValPairStreamCodec(); + } + }; + + /** + * Adds the value for each key. + * + * @param tuple + * @param map + */ + public void addTuple(KeyValPair<K, V> tuple, Map<K, MutableDouble> map) + { + K key = tuple.getKey(); + if (!doprocessKey(key) || (tuple.getValue() == null)) { + return; + } + MutableDouble val = map.get(key); + if (val == null) { + val = new MutableDouble(0.0); + map.put(cloneKey(key), val); + } + val.add(tuple.getValue().doubleValue()); + } /** - * Output margin port that emits Key Value pairs. - */ - public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>(); - - protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>(); - protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>(); - protected boolean percent = false; - - /** - * getter function for percent - * - * @return percent - */ - public boolean getPercent() - { - return percent; - } - - /** - * setter function for percent - * - * @param val - * sets percent - */ - public void setPercent(boolean val) - { - percent = val; - } - - /** - * Generates tuples for each key and emits them. Only keys that are in the - * denominator are iterated on If the key is only in the numerator, it gets - * ignored (cannot do divide by 0) Clears internal data - */ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override - public void endWindow() - { - Double val; - for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) { - K key = e.getKey(); - MutableDouble nval = numerators.get(key); - if (nval == null) { - nval = new MutableDouble(0.0); - } else { - numerators.remove(key); // so that all left over keys can be reported - } - if (percent) { - val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100; - } else { - val = 1 - nval.doubleValue() / e.getValue().doubleValue(); - } - - margin.emit(new KeyValPair(key, getValue(val.doubleValue()))); - } - - numerators.clear(); - denominators.clear(); - } + * Output margin port that emits Key Value pairs. + */ + public final transient DefaultOutputPort<KeyValPair<K, V>> margin = new DefaultOutputPort<KeyValPair<K, V>>(); + + protected HashMap<K, MutableDouble> numerators = new HashMap<K, MutableDouble>(); + protected HashMap<K, MutableDouble> denominators = new HashMap<K, MutableDouble>(); + protected boolean percent = false; + + /** + * getter function for percent + * + * @return percent + */ + public boolean getPercent() + { + return percent; + } + + /** + * setter function for percent + * + * @param val + * sets percent + */ + public void setPercent(boolean val) + { + percent = val; + } + + /** + * Generates tuples for each key and emits them. Only keys that are in the + * denominator are iterated on If the key is only in the numerator, it gets + * ignored (cannot do divide by 0) Clears internal data + */ + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public void endWindow() + { + Double val; + for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) { + K key = e.getKey(); + MutableDouble nval = numerators.get(key); + if (nval == null) { + nval = new MutableDouble(0.0); + } else { + numerators.remove(key); // so that all left over keys can be reported + } + if (percent) { + val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100; + } else { + val = 1 - nval.doubleValue() / e.getValue().doubleValue(); + } + + margin.emit(new KeyValPair(key, getValue(val.doubleValue()))); + } + + numerators.clear(); + denominators.clear(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MarginMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java index 2259d85..7ef1f81 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MarginMap.java +++ b/library/src/main/java/com/datatorrent/lib/math/MarginMap.java @@ -18,13 +18,15 @@ */ package com.datatorrent.lib.math; +import java.util.HashMap; +import java.util.Map; + +import org.apache.commons.lang.mutable.MutableDouble; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.lib.util.BaseNumberKeyValueOperator; import com.datatorrent.lib.util.UnifierHashMap; -import java.util.HashMap; -import java.util.Map; -import org.apache.commons.lang.mutable.MutableDouble; /** @@ -144,18 +146,16 @@ public class MarginMap<K, V extends Number> extends BaseNumberKeyValueOperator<K { HashMap<K, V> tuples = new HashMap<K, V>(); Double val; - for (Map.Entry<K, MutableDouble> e: denominators.entrySet()) { + for (Map.Entry<K, MutableDouble> e : denominators.entrySet()) { MutableDouble nval = numerators.get(e.getKey()); if (nval == null) { nval = new MutableDouble(0.0); - } - else { + } else { numerators.remove(e.getKey()); // so that all left over keys can be reported } if (percent) { val = (1 - nval.doubleValue() / e.getValue().doubleValue()) * 100; - } - else { + } else { val = 1 - nval.doubleValue() / e.getValue().doubleValue(); } tuples.put(e.getKey(), getValue(val.doubleValue())); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Max.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Max.java b/library/src/main/java/com/datatorrent/lib/math/Max.java index e4171f6..8ec8b2f 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Max.java +++ b/library/src/main/java/com/datatorrent/lib/math/Max.java @@ -64,8 +64,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements if (!flag) { high = tuple; flag = true; - } - else if (high.doubleValue() < tuple.doubleValue()) { + } else if (high.doubleValue() < tuple.doubleValue()) { high = tuple; } } @@ -74,7 +73,7 @@ public class Max<V extends Number> extends BaseNumberValueOperator<V> implements * Max value output port. */ public final transient DefaultOutputPort<V> max = new DefaultOutputPort<V>() - { + { @Override public Unifier<V> getUnifier() { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java index 58a947f..95a0a08 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/MaxKeyVal.java @@ -21,12 +21,11 @@ package com.datatorrent.lib.math; import java.util.HashMap; import java.util.Map; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.util.BaseNumberKeyValueOperator; +import com.datatorrent.lib.util.KeyValPair; /** * @@ -68,8 +67,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K if (val == null) { val = tval; highs.put(cloneKey(key), val); - } - else if (val.doubleValue() < tval.doubleValue()) { + } else if (val.doubleValue() < tval.doubleValue()) { highs.put(key, tval); } } @@ -97,7 +95,7 @@ public class MaxKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K * Clears internal data. Node only works in windowed mode. */ @SuppressWarnings({ "rawtypes", "unchecked" }) - @Override + @Override public void endWindow() { if (!highs.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Min.java b/library/src/main/java/com/datatorrent/lib/math/Min.java index 244d990..4b3fa23 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Min.java +++ b/library/src/main/java/com/datatorrent/lib/math/Min.java @@ -41,17 +41,17 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; */ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements Unifier<V> { - /** - * Computed low value. - */ + /** + * Computed low value. + */ protected V low; // transient field protected boolean flag = false; - /** - * Input port that takes a number and compares to min and stores the new min. - */ + /** + * Input port that takes a number and compares to min and stores the new min. + */ public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() { /** @@ -73,8 +73,7 @@ public class Min<V extends Number> extends BaseNumberValueOperator<V> implements if (!flag) { low = tuple; flag = true; - } - else if (low.doubleValue() > tuple.doubleValue()) { + } else if (low.doubleValue() > tuple.doubleValue()) { low = tuple; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java index 5ea710b..2468239 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/math/MinKeyVal.java @@ -21,12 +21,11 @@ package com.datatorrent.lib.math; import java.util.HashMap; import java.util.Map; -import com.datatorrent.lib.util.BaseNumberKeyValueOperator; -import com.datatorrent.lib.util.KeyValPair; - import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.StreamCodec; +import com.datatorrent.lib.util.BaseNumberKeyValueOperator; +import com.datatorrent.lib.util.KeyValPair; /** * @@ -48,9 +47,9 @@ import com.datatorrent.api.StreamCodec; */ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K, V> { - /** - * Input port which takes a key vaue pair and updates the value for each key if there is a new min. - */ + /** + * Input port which takes a key vaue pair and updates the value for each key if there is a new min. + */ public final transient DefaultInputPort<KeyValPair<K, V>> data = new DefaultInputPort<KeyValPair<K, V>>() { /** @@ -67,8 +66,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K V val = mins.get(key); if (val == null) { mins.put(cloneKey(key), tval); - } - else if (val.doubleValue() > tval.doubleValue()) { + } else if (val.doubleValue() > tval.doubleValue()) { mins.put(key, tval); } } @@ -94,7 +92,7 @@ public class MinKeyVal<K, V extends Number> extends BaseNumberKeyValueOperator<K * Clears internal data. Node only works in windowed mode. */ @SuppressWarnings({ "unchecked", "rawtypes" }) - @Override + @Override public void endWindow() { if (!mins.isEmpty()) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java index c1c70d3..dd56f7f 100644 --- a/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java +++ b/library/src/main/java/com/datatorrent/lib/math/MultiplyByConstant.java @@ -18,10 +18,10 @@ */ package com.datatorrent.lib.math; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; /** * Multiplies input tuple (Number) by the value of property "multiplier" and emits the result on respective ports. @@ -51,9 +51,9 @@ import com.datatorrent.api.annotation.Stateless; @Stateless public class MultiplyByConstant extends BaseOperator { - /** - * Input number port. - */ + /** + * Input number port. + */ public final transient DefaultInputPort<Number> input = new DefaultInputPort<Number>() { @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/math/Quotient.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/math/Quotient.java b/library/src/main/java/com/datatorrent/lib/math/Quotient.java index d55f205..ed08e86 100644 --- a/library/src/main/java/com/datatorrent/lib/math/Quotient.java +++ b/library/src/main/java/com/datatorrent/lib/math/Quotient.java @@ -47,63 +47,63 @@ import com.datatorrent.lib.util.BaseNumberValueOperator; @OperatorAnnotation(partitionable = false) public class Quotient<V extends Number> extends BaseNumberValueOperator<V> { - protected double nval = 0.0; - protected double dval = 0.0; - int mult_by = 1; + protected double nval = 0.0; + protected double dval = 0.0; + int mult_by = 1; - /** - * Numerator values input port. - */ - public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>() - { - /** - * Adds to the numerator value - */ - @Override - public void process(V tuple) - { - nval += tuple.doubleValue(); - } - }; + /** + * Numerator values input port. + */ + public final transient DefaultInputPort<V> numerator = new DefaultInputPort<V>() + { + /** + * Adds to the numerator value + */ + @Override + public void process(V tuple) + { + nval += tuple.doubleValue(); + } + }; - /** - * Denominator values input port. - */ - public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>() - { - /** - * Adds to the denominator value - */ - @Override - public void process(V tuple) - { - dval += tuple.doubleValue(); - } - }; + /** + * Denominator values input port. + */ + public final transient DefaultInputPort<V> denominator = new DefaultInputPort<V>() + { + /** + * Adds to the denominator value + */ + @Override + public void process(V tuple) + { + dval += tuple.doubleValue(); + } + }; - /** - * Quotient output port. - */ - public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>(); + /** + * Quotient output port. + */ + public final transient DefaultOutputPort<V> quotient = new DefaultOutputPort<V>(); - public void setMult_by(int i) - { - mult_by = i; - } + public void setMult_by(int i) + { + mult_by = i; + } - /** - * Generates tuple emits it as long as denominator is not 0. Clears internal - * data - */ - @Override - public void endWindow() - { - if (dval == 0) { - return; - } - double val = (nval / dval) * mult_by; - quotient.emit(getValue(val)); - nval = 0.0; - dval = 0.0; - } + /** + * Generates tuple emits it as long as denominator is not 0. Clears internal + * data + */ + @Override + public void endWindow() + { + if (dval == 0) { + return; + } + double val = (nval / dval) * mult_by; + quotient.emit(getValue(val)); + nval = 0.0; + dval = 0.0; + } }
