http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java index 0a4fd1a..db2c2b7 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/FirstLastFunction.java @@ -51,7 +51,7 @@ public class FirstLastFunction extends FunctionIndex * @param alias Alias name for output. * @param isFirst return first value if true. */ - public FirstLastFunction(@NotNull String column,String alias, boolean isLast) + public FirstLastFunction(@NotNull String column, String alias, boolean isLast) { super(column, alias); isFirst = !isLast; @@ -63,14 +63,20 @@ public class FirstLastFunction extends FunctionIndex @Override public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception { - if (rows.size() == 0) return null; + if (rows.size() == 0) { + return null; + } if (isFirst) { - for (int i=0; i < rows.size(); i++) { - if (rows.get(i).get(column) != null) return rows.get(i).get(column); + for (int i = 0; i < rows.size(); i++) { + if (rows.get(i).get(column) != null) { + return rows.get(i).get(column); + } } } else { - for (int i= (rows.size()-1); i >= 0; i--) { - if (rows.get(i).get(column) != null) return rows.get(i).get(column); + for (int i = (rows.size() - 1); i >= 0; i--) { + if (rows.get(i).get(column) != null) { + return rows.get(i).get(column); + } } } return null; @@ -83,9 +89,11 @@ public class FirstLastFunction extends FunctionIndex @Override protected String aggregateName() { - if (!StringUtils.isEmpty(alias)) return alias; + if (!StringUtils.isEmpty(alias)) { + return alias; + } if (isFirst) { - return "FIRST(" + column + ")"; + return "FIRST(" + column + ")"; } return "LAST(" + column + ")"; }
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java index 57376e4..918ca89 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/FunctionIndex.java @@ -36,7 +36,7 @@ import javax.validation.constraints.NotNull; * @tags sql aggregate * @since 0.3.4 */ -abstract public class FunctionIndex +public abstract class FunctionIndex { /** * Column name. @@ -64,13 +64,13 @@ abstract public class FunctionIndex * @param rows Tuple list over application window. * @return aggregate result object. */ - abstract public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; + public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; /** * Get aggregate output value name. * @return name string. */ - abstract protected String aggregateName(); + protected abstract String aggregateName(); /** * Apply compute function to given rows and store result in collect by output value name. @@ -78,10 +78,16 @@ abstract public class FunctionIndex */ public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> collect) throws Exception { - if (rows == null) return; + if (rows == null) { + return; + } String name = column; - if (alias != null) name = alias; - if (name == null) name = aggregateName(); + if (alias != null) { + name = alias; + } + if (name == null) { + name = aggregateName(); + } collect.put(name, compute(rows)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java index 596e080..f02e82c 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/MaxMinFunction.java @@ -67,8 +67,10 @@ public class MaxMinFunction extends FunctionIndex double minMax = 0.0; for (Map<String, Object> row : rows) { double value = ((Number)row.get(column)).doubleValue(); - if ((isMax && (minMax < value))||(!isMax && (minMax > value))) minMax = value; - } + if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) { + minMax = value; + } + } return minMax; } @@ -79,8 +81,12 @@ public class MaxMinFunction extends FunctionIndex @Override protected String aggregateName() { - if (!StringUtils.isEmpty(alias)) return alias; - if (isMax) return "MAX(" + column + ")"; + if (!StringUtils.isEmpty(alias)) { + return alias; + } + if (isMax) { + return "MAX(" + column + ")"; + } return "MIN(" + column + ")"; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java index 484aa95..02186cd 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/SumFunction.java @@ -45,8 +45,10 @@ public class SumFunction extends FunctionIndex { Double result = 0.0; for (Map<String, Object> row : rows) { - if (!row.containsKey(column)) continue; - result += ((Number)row.get(column)).doubleValue(); + if (!row.containsKey(column)) { + continue; + } + result += ((Number)row.get(column)).doubleValue(); } return result; } @@ -54,7 +56,7 @@ public class SumFunction extends FunctionIndex @Override protected String aggregateName() { - return "Sum(" + column; + return "Sum(" + column; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java index 23fa86b..21c1d11 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/BinaryExpression.java @@ -30,7 +30,7 @@ import javax.validation.constraints.NotNull; * @tags alias * @since 0.3.4 */ -abstract public class BinaryExpression implements Index +public abstract class BinaryExpression implements Index { /** * Left column name argument for expression. @@ -50,9 +50,9 @@ abstract public class BinaryExpression implements Index protected String alias; /** - * @param Left column name argument for expression. - * @param Right column name argument for expression. - * @param Alias name for output field. + * @param left column name argument for expression. + * @param right column name argument for expression. + * @param alias name for output field. */ public BinaryExpression(@NotNull String left, @NotNull String right, String alias) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java index 78cc547..a4ad2b7 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/ColumnIndex.java @@ -53,7 +53,9 @@ public class ColumnIndex implements Index public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) { String name = getColumn(); - if (alias != null) name = alias; + if (alias != null) { + name = alias; + } collect.put(name, row.get(name)); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java index 890185b..5067d00 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/Index.java @@ -35,5 +35,5 @@ public interface Index /** * Function can key/value hash map, does metric implemented by sub class. */ - public void filter(@NotNull Map<String,Object> row, @NotNull Map<String, Object> collect); + public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java index 89456b2..931ddaa 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/MidIndex.java @@ -38,22 +38,26 @@ public class MidIndex extends ColumnIndex public MidIndex(@NotNull String column, String alias, int start) { super(column, alias); - assert(start >= 0); + assert (start >= 0); this.start = start; } @Override public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) { - if (!row.containsKey(column)) return; + if (!row.containsKey(column)) { + return; + } if (!(row.get(column) instanceof String)) { - assert(false); + assert (false); } String name = getColumn(); - if (alias != null) name = alias; + if (alias != null) { + name = alias; + } int endIndex = start + length; - if ((length == 0)||(endIndex > ((String)row.get(column)).length())) { + if ((length == 0) || (endIndex > ((String)row.get(column)).length())) { collect.put(name, row.get(column)); } else { collect.put(name, ((String)row.get(column)).substring(start, endIndex)); @@ -67,7 +71,7 @@ public class MidIndex extends ColumnIndex public void setLength(int length) { - assert(length > 0); + assert (length > 0); this.length = length; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java index 792ad80..969e3af 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/NegateExpression.java @@ -40,7 +40,9 @@ public class NegateExpression extends UnaryExpression public NegateExpression(@Null String column, String alias) { super(column, alias); - if (this.alias == null) this.alias = "NEGATE(" + column + ")"; + if (this.alias == null) { + this.alias = "NEGATE(" + column + ")"; + } } /* (non-Javadoc) @@ -49,7 +51,9 @@ public class NegateExpression extends UnaryExpression @Override public void filter(Map<String, Object> row, Map<String, Object> collect) { - if (!row.containsKey(column)) return; + if (!row.containsKey(column)) { + return; + } collect.put(alias, -((Number)row.get(column)).doubleValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java index 46a563f..90e16a1 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/RoundDoubleIndex.java @@ -37,17 +37,23 @@ public class RoundDoubleIndex extends ColumnIndex { super(column, alias); rounder = 1; - if (numDecimals > 0) rounder = (int) Math.pow(10, numDecimals); + if (numDecimals > 0) { + rounder = (int)Math.pow(10, numDecimals); + } } @Override public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) { - if (!row.containsKey(column)) return; - double value = (Double) row.get(column); - value = Math.round(value * rounder)/rounder; + if (!row.containsKey(column)) { + return; + } + double value = (Double)row.get(column); + value = Math.round(value * rounder) / rounder; String name = getColumn(); - if (alias != null) name = alias; + if (alias != null) { + name = alias; + } collect.put(name, value); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java index 2d792ff..2c49a79 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringCaseIndex.java @@ -42,13 +42,17 @@ public class StringCaseIndex extends ColumnIndex @Override public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) { - if (!row.containsKey(column)) return; + if (!row.containsKey(column)) { + return; + } if (!(row.get(column) instanceof String)) { - assert(false); + assert (false); } String name = getColumn(); - if (alias != null) name = alias; + if (alias != null) { + name = alias; + } if (toUpperCase) { collect.put(name, ((String)row.get(column)).toUpperCase()); } else { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java index 4fa05b5..4dbfee1 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/StringLenIndex.java @@ -40,13 +40,17 @@ public class StringLenIndex extends ColumnIndex @Override public void filter(@NotNull Map<String, Object> row, @NotNull Map<String, Object> collect) { - if (!row.containsKey(column)) return; + if (!row.containsKey(column)) { + return; + } if (!(row.get(column) instanceof String)) { - assert(false); + assert (false); } String name = getColumn(); - if (alias != null) name = alias; + if (alias != null) { + name = alias; + } collect.put(name, ((String)row.get(column)).length()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java index acddf51..a0144da 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/SumExpression.java @@ -36,14 +36,16 @@ public class SumExpression extends BinaryExpression { /** - * @param Left column name argument for expression. - * @param Right column name argument for expression. - * @param Alias name for output field. + * @param left column name argument for expression. + * @param right column name argument for expression. + * @param alias name for output field. */ public SumExpression(@NotNull String left, @NotNull String right, String alias) { super(left, right, alias); - if (this.alias == null) this.alias = "SUM(" + left + "," + right + ")"; + if (this.alias == null) { + this.alias = "SUM(" + left + "," + right + ")"; + } } /* sum column values. @@ -52,7 +54,9 @@ public class SumExpression extends BinaryExpression @Override public void filter(Map<String, Object> row, Map<String, Object> collect) { - if (!row.containsKey(left) || !row.containsKey(right)) return; + if (!row.containsKey(left) || !row.containsKey(right)) { + return; + } collect.put(alias, ((Number)row.get(left)).doubleValue() + ((Number)row.get(right)).doubleValue()); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java b/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java index ea52986..45e90ec 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/index/UnaryExpression.java @@ -31,7 +31,7 @@ import javax.validation.constraints.NotNull; * @tags unary, alias * @since 0.3.4 */ -abstract public class UnaryExpression implements Index +public abstract class UnaryExpression implements Index { /** * Column name argument for unary expression. @@ -45,8 +45,8 @@ abstract public class UnaryExpression implements Index protected String alias; /** - * @param Column name argument for unary expression. - * @param Alias name for output field. + * @param column name argument for unary expression. + * @param alias name for output field. */ public UnaryExpression(@NotNull String column, String alias) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java index c6df099..93889be 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/ArrayListTestSink.java @@ -18,12 +18,13 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.api.Sink; - import java.util.ArrayList; import java.util.HashMap; + import org.apache.commons.lang.mutable.MutableInt; +import com.datatorrent.api.Sink; + /** * A sink implementation to collect expected test results in a HashMap. * <p> @@ -61,7 +62,7 @@ public class ArrayListTestSink<T> implements Sink<T> { this.count++; @SuppressWarnings("unchecked") - ArrayList<Object> list = (ArrayList<Object>) tuple; + ArrayList<Object> list = (ArrayList<Object>)tuple; for (Object o: list) { MutableInt val = map.get(o); if (val == null) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java index 778a82a..2fd776e 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/CollectorTestSink.java @@ -18,11 +18,11 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.api.Sink; - import java.util.ArrayList; import java.util.List; +import com.datatorrent.api.Sink; + /** * A sink implementation to collect expected test results. * <p> @@ -33,7 +33,7 @@ import java.util.List; */ public class CollectorTestSink<T> implements Sink<T> { - final public List<T> collectedTuples = new ArrayList<T>(); + public final List<T> collectedTuples = new ArrayList<T>(); /** * clears data @@ -46,10 +46,10 @@ public class CollectorTestSink<T> implements Sink<T> @Override public void put(T payload) { - synchronized (collectedTuples) { - collectedTuples.add(payload); - collectedTuples.notifyAll(); - } + synchronized (collectedTuples) { + collectedTuples.add(payload); + collectedTuples.notifyAll(); + } } public void waitForResultCount(int count, long timeoutMillis) throws InterruptedException http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java b/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java index d338d34..a309b89 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/CompareFilterTuples.java @@ -21,9 +21,9 @@ package com.datatorrent.lib.testbench; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; /** * <p>Implements Compare Filter Tuples class.</p> @@ -35,9 +35,12 @@ import com.datatorrent.api.DefaultOutputPort; */ public class CompareFilterTuples<k> extends BaseOperator { - // Compare type function + // Compare type function private Compare compareType = Compare.Equal; - public enum Compare { Smaller, Equal, Greater } + public enum Compare + { + Smaller, Equal, Greater + } /** * Compare the incoming value with the Property value. @@ -45,69 +48,85 @@ public class CompareFilterTuples<k> extends BaseOperator */ public void setCompareType(Compare type) { - compareType = type; + compareType = type; } // compare value private int value; public void setValue(int value) { - this.value = value; + this.value = value; } // Collected result tuples private Map<k, Integer> result; - /** - * Input port that takes a map of integer values. - */ - public final transient DefaultInputPort<Map<k, Integer>> inport = new DefaultInputPort<Map<k, Integer>>() { + /** + * Input port that takes a map of integer values. + */ + + public final transient DefaultInputPort<Map<k, Integer>> inport = new DefaultInputPort<Map<k, Integer>>() + { @Override - public void process(Map<k, Integer> map) { - for(Map.Entry<k, Integer> entry : map.entrySet()) - { - if ( compareType == Compare.Equal ) if(entry.getValue().intValue() == value) result.put(entry.getKey(), entry.getValue()); - if ( compareType == Compare.Greater ) if(entry.getValue().intValue() > value) result.put(entry.getKey(), entry.getValue()); - if ( compareType == Compare.Smaller ) if(entry.getValue().intValue() < value) result.put(entry.getKey(), entry.getValue()); - } + public void process(Map<k, Integer> map) + { + for (Map.Entry<k, Integer> entry : map.entrySet()) { + if (compareType == Compare.Equal) { + if (entry.getValue().intValue() == value) { + result.put(entry.getKey(), + entry.getValue()); + } + } + if (compareType == Compare.Greater) { + if (entry.getValue().intValue() > value) { + result.put(entry.getKey(), + entry.getValue()); + } + } + if (compareType == Compare.Smaller) { + if (entry.getValue().intValue() < value) { + result.put(entry.getKey(), + entry.getValue()); + } + } + } } - }; + }; - /** - * Output port that emits a map of integer values. - */ - public final transient DefaultOutputPort<Map<k, Integer>> outport = new DefaultOutputPort<Map<k, Integer>>(); + /** + * Output port that emits a map of integer values. + */ + public final transient DefaultOutputPort<Map<k, Integer>> outport = new DefaultOutputPort<Map<k, Integer>>(); /** - * Output redis port that emits a map of <integer,string> values. - */ - public final transient DefaultOutputPort<Map<Integer, String>> redisport = new DefaultOutputPort<Map<Integer, String>>(); + * Output redis port that emits a map of <integer,string> values. + */ + public final transient DefaultOutputPort<Map<Integer, String>> redisport = new DefaultOutputPort<Map<Integer, String>>(); - @Override - public void beginWindow(long windowId) - { - result = new HashMap<k, Integer>(); - } + @Override + public void beginWindow(long windowId) + { + result = new HashMap<k, Integer>(); + } - @Override - public void endWindow() - { - outport.emit(result); + @Override + public void endWindow() + { + outport.emit(result); - int numOuts = 1; - Integer total = 0; - for (Map.Entry<k, Integer> entry : result.entrySet()) - { - Map<Integer, String> tuple = new HashMap<Integer, String>(); - tuple.put(numOuts++, entry.getKey().toString()); - redisport.emit(tuple); - total += entry.getValue(); - } - Map<Integer, String> tuple = new HashMap<Integer, String>(); - tuple.put(numOuts++, total.toString()); - redisport.emit(tuple); - tuple = new HashMap<Integer, String>(); - tuple.put(0, new Integer(numOuts).toString()); - redisport.emit(tuple); - } + int numOuts = 1; + Integer total = 0; + for (Map.Entry<k, Integer> entry : result.entrySet()) { + Map<Integer, String> tuple = new HashMap<Integer, String>(); + tuple.put(numOuts++, entry.getKey().toString()); + redisport.emit(tuple); + total += entry.getValue(); + } + Map<Integer, String> tuple = new HashMap<Integer, String>(); + tuple.put(numOuts++, total.toString()); + redisport.emit(tuple); + tuple = new HashMap<Integer, String>(); + tuple.put(0, new Integer(numOuts).toString()); + redisport.emit(tuple); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java index 9e4f8b5..73506ae 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/CountAndLastTupleTestSink.java @@ -43,7 +43,7 @@ public class CountAndLastTupleTestSink<T> extends CountTestSink<T> @Override public void put(T tuple) { - this.tuple = tuple; - count++; + this.tuple = tuple; + count++; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java b/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java index 29343c9..9f6df10 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/CountOccurance.java @@ -22,10 +22,10 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * <p>A base implementation of an operator which does count occurrence.</p> @@ -37,70 +37,70 @@ import com.datatorrent.api.Context.OperatorContext; */ public class CountOccurance<k> extends BaseOperator { - private Map<k, Integer> collect; - public final transient DefaultInputPort<k> inport = new DefaultInputPort<k>() { + private Map<k, Integer> collect; + public final transient DefaultInputPort<k> inport = new DefaultInputPort<k>() + { @Override - public void process(k s) { - if (collect.containsKey(s)) - { - Integer value = (Integer)collect.remove(s); - collect.put(s, new Integer(value+1)); - } else { - collect.put(s, new Integer(1)); - } + public void process(k s) + { + if (collect.containsKey(s)) { + Integer value = (Integer)collect.remove(s); + collect.put(s, new Integer(value + 1)); + } else { + collect.put(s, new Integer(1)); + } } - }; + }; - @Override - public void setup(OperatorContext context) - { - } + @Override + public void setup(OperatorContext context) + { + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } - @Override - public void beginWindow(long windowId) - { - collect = new HashMap<k, Integer>(); - } + @Override + public void beginWindow(long windowId) + { + collect = new HashMap<k, Integer>(); + } - /** - * Output port that emits a map of integer values. - */ - public final transient DefaultOutputPort<Map<k, Integer>> outport = new DefaultOutputPort<Map<k, Integer>>(); + /** + * Output port that emits a map of integer values. + */ + public final transient DefaultOutputPort<Map<k, Integer>> outport = new DefaultOutputPort<Map<k, Integer>>(); /** - * Output dimensions port that emits a map of <string,object> values. - */ - public final transient DefaultOutputPort<Map<String, Object>> dimensionOut = new DefaultOutputPort<Map<String, Object>>(); + * Output dimensions port that emits a map of <string,object> values. + */ + public final transient DefaultOutputPort<Map<String, Object>> dimensionOut = new DefaultOutputPort<Map<String, Object>>(); - /** - * Output total port that emits a map of <string,integer> count values. - */ - public final transient DefaultOutputPort<Map<String,Integer>> total = new DefaultOutputPort<Map<String,Integer>>(); + /** + * Output total port that emits a map of <string,integer> count values. + */ + public final transient DefaultOutputPort<Map<String, Integer>> total = new DefaultOutputPort<Map<String, Integer>>(); - @Override - public void endWindow() - { - outport.emit(collect); - long timestamp = new Date().getTime(); - int allcount = 0; - for(Map.Entry<k, Integer> entry : collect.entrySet()) - { - Map<String, Object> map = new HashMap<String, Object>(); - map.put("timestamp", timestamp); - map.put("item", entry.getKey()); - map.put("view", entry.getValue()); - dimensionOut.emit(map); - allcount += entry.getValue(); - } - Map<String, Integer> map = new HashMap<String, Integer>(); - map.put("total", new Integer(allcount)); - total.emit(map); - collect = null; - collect = new HashMap<k, Integer>(); - } + @Override + public void endWindow() + { + outport.emit(collect); + long timestamp = new Date().getTime(); + int allcount = 0; + for (Map.Entry<k, Integer> entry : collect.entrySet()) { + Map<String, Object> map = new HashMap<String, Object>(); + map.put("timestamp", timestamp); + map.put("item", entry.getKey()); + map.put("view", entry.getValue()); + dimensionOut.emit(map); + allcount += entry.getValue(); + } + Map<String, Integer> map = new HashMap<String, Integer>(); + map.put("total", new Integer(allcount)); + total.emit(map); + collect = null; + collect = new HashMap<k, Integer>(); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java index cc7a70b..e6a85c9 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/CountTestSink.java @@ -43,12 +43,11 @@ public class CountTestSink<T> extends CollectorTestSink<T> } /** - * * @param payload */ @Override public void put(T payload) { - count++; + count++; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java index db02f22..547340e 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifier.java @@ -18,16 +18,16 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Random; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * An implementation of BaseOperator that creates a load with pair of keys by taking in an input stream event and adding to incoming keys * to create a new tuple that is emitted on output port data. @@ -72,51 +72,47 @@ public class EventClassifier extends BaseOperator @Override public void process(HashMap<String, Double> tuple) { - for (Map.Entry<String, Double> e: tuple.entrySet()) { - String inkey = e.getKey(); - ArrayList<Integer> alist = null; - if (inkeys != null) { - alist = inkeys.get(e.getKey()); - } - if (alist == null) { - alist = noweight; - } - - // now alist are the weights - int rval = random.nextInt(alist.get(alist.size() - 1)); - int j = 0; - int wval = 0; - for (Integer ew: alist) { - wval += ew.intValue(); - if (wval >= rval) { - break; - } - j++; - } - HashMap<String, Double> otuple = new HashMap<String, Double>(1); - String key = wtostr_index.get(j); // the key - Double keyval = null; - if (hasvalues) { - if (voper == value_operation.VOPR_REPLACE) { // replace the incoming value - keyval = keys.get(key); + for (Map.Entry<String, Double> e : tuple.entrySet()) { + String inkey = e.getKey(); + ArrayList<Integer> alist = null; + if (inkeys != null) { + alist = inkeys.get(e.getKey()); } - else if (voper == value_operation.VOPR_ADD) { - keyval = keys.get(key) + e.getValue(); + if (alist == null) { + alist = noweight; } - else if (voper == value_operation.VOPR_MULT) { - keyval = keys.get(key) * e.getValue(); + // now alist are the weights + int rval = random.nextInt(alist.get(alist.size() - 1)); + int j = 0; + int wval = 0; + for (Integer ew : alist) { + wval += ew.intValue(); + if (wval >= rval) { + break; + } + j++; } - else if (voper == value_operation.VOPR_APPEND) { // not supported yet - keyval = keys.get(key); + HashMap<String, Double> otuple = new HashMap<String, Double>(1); + String key = wtostr_index.get(j); // the key + Double keyval = null; + if (hasvalues) { + if (voper == value_operation.VOPR_REPLACE) { // replace the incoming value + keyval = keys.get(key); + } else if (voper == value_operation.VOPR_ADD) { + keyval = keys.get(key) + e.getValue(); + } else if (voper == value_operation.VOPR_MULT) { + keyval = keys.get(key) * e.getValue(); + + } else if (voper == value_operation.VOPR_APPEND) { // not supported yet + keyval = keys.get(key); + } + } else { // pass on the value from incoming tuple + keyval = e.getValue(); } + otuple.put(key + "," + inkey, keyval); + data.emit(otuple); } - else { // pass on the value from incoming tuple - keyval = e.getValue(); - } - otuple.put(key + "," + inkey, keyval); - data.emit(otuple); - } } }; @@ -124,7 +120,6 @@ public class EventClassifier extends BaseOperator * Output data port that emits a hashmap of <string,double>. */ public final transient DefaultOutputPort<HashMap<String, Double>> data = new DefaultOutputPort<HashMap<String, Double>>(); -; HashMap<String, Double> keys = new HashMap<String, Double>(); HashMap<Integer, String> wtostr_index = new HashMap<Integer, String>(); @@ -139,7 +134,8 @@ public class EventClassifier extends BaseOperator enum value_operation { VOPR_REPLACE, VOPR_ADD, VOPR_MULT, VOPR_APPEND - }; + } + value_operation voper = value_operation.VOPR_REPLACE; @@ -163,7 +159,7 @@ public class EventClassifier extends BaseOperator voper = value_operation.VOPR_MULT; } - public void setKeyWeights(HashMap<String, ArrayList<Integer>> map) + public void setKeyWeights(HashMap<String, ArrayList<Integer>> map) { if (inkeys == null) { inkeys = new HashMap<String, ArrayList<Integer>>(); @@ -183,7 +179,7 @@ public class EventClassifier extends BaseOperator } } - @Override + @Override public void setup(OperatorContext context) { noweight = new ArrayList<Integer>(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java index 0158f3b..66ad12e 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/EventClassifierNumberToHashDouble.java @@ -18,16 +18,18 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.HashMap; + import javax.validation.constraints.NotNull; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * An implementation of BaseOperator that creates a load with pair of keys by taking in an input stream event and adding to incoming keys * to create a new tuple of Hashmap <String,Double> that is emitted on output port data. @@ -95,7 +97,7 @@ public class EventClassifierNumberToHashDouble<K extends Number> extends BaseOpe int seed = 0; int seed_size = 1; - String [] keys = null; + String[] keys = null; /** * setup before dag is run (pre-runtime, and post compile time) @@ -116,8 +118,7 @@ public class EventClassifierNumberToHashDouble<K extends Number> extends BaseOpe Integer ival = i; keys[i] = getKey() + ival.toString(); } - } - else { + } else { for (int i = s_end; i <= s_start; i++) { Integer ival = i; keys[i] = getKey() + ival.toString(); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java b/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java index 8dcf264..6b2ed8f 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/EventGenerator.java @@ -105,7 +105,7 @@ public class EventGenerator implements InputOperator HashMap<Integer, String> wtostr_index = new HashMap<Integer, String>(); ArrayList<Integer> weights; int total_weight = 0; - private transient final Random random = new Random(); + private final transient Random random = new Random(); public static final int ROLLING_WINDOW_COUNT_DEFAULT = 1; @Min(1) private int rolling_window_count = ROLLING_WINDOW_COUNT_DEFAULT; @@ -157,14 +157,12 @@ public class EventGenerator implements InputOperator } weights.add(Integer.parseInt(weightsArray[i])); total_weight += Integer.parseInt(weightsArray[i]); - } - else { + } else { total_weight += 1; } if ((valuesArray != null) && valuesArray.length != 0) { keys.put(s, new Double(Double.parseDouble(valuesArray[i]))); - } - else { + } else { keys.put(s, 0.0); } wtostr_index.put(i, s); @@ -197,8 +195,7 @@ public class EventGenerator implements InputOperator long average; if (rolling_window_count == 1) { average = (tcount * 1000) / elapsedTime; - } - else { // use tuple_numbers + } else { // use tuple_numbers int slots; if (count_denominator == rolling_window_count) { tuple_numbers[tuple_index] = tcount; @@ -208,8 +205,7 @@ public class EventGenerator implements InputOperator if (tuple_index == rolling_window_count) { tuple_index = 0; } - } - else { + } else { tuple_numbers[count_denominator - 1] = tcount; time_numbers[count_denominator - 1] = elapsedTime; slots = count_denominator; @@ -259,7 +255,7 @@ public class EventGenerator implements InputOperator /** * Comma separated strings which can be used as keys - * @param value + * @param keys */ public void setKeysHelper(String keys) { @@ -275,14 +271,13 @@ public class EventGenerator implements InputOperator /** * Comma separated values which are used as weight for the same indexed keys. - * @param value + * @param weight */ public void setWeightsHelper(String weight) { if (weight.isEmpty()) { weightsArray = null; - } - else { + } else { weightsArray = weight.split(","); } } @@ -300,8 +295,7 @@ public class EventGenerator implements InputOperator { if (value.isEmpty()) { valuesArray = null; - } - else { + } else { valuesArray = value.split(","); } } @@ -341,8 +335,7 @@ public class EventGenerator implements InputOperator } j++; } - } - else { + } else { j++; j = j % keys.size(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java b/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java index 3668d5c..46b684d 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/EventIncrementer.java @@ -18,14 +18,15 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.lib.util.KeyValPair; import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + /** * Creates a random movement by taking in a seed stream and incrementing this data. * <p> @@ -74,8 +75,7 @@ public class EventIncrementer extends BaseOperator if (keys.length != e.getValue().size()) { // bad seed return; // emit error tuple here - } - else { + } else { ArrayList<KeyValPair<String, Double>> alist = new ArrayList<KeyValPair<String, Double>>(keys.length); int j = 0; for (Integer s: e.getValue()) { @@ -189,8 +189,7 @@ public class EventIncrementer extends BaseOperator double range = high - low; if (increment > range) { // bad data, do nothing ret = current; - } - else { + } else { sign = sign * -1.0; ret += sign * increment; if (ret < low) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java b/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java index a8ec00b..389cbb3 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/FilterClassifier.java @@ -18,16 +18,16 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Random; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * Filters the tuples as per the filter (pass through percent) and emits them. * <p> @@ -83,8 +83,7 @@ public class FilterClassifier<T> extends BaseOperator ArrayList<Integer> alist; if (inkeys != null) { alist = inkeys.get(inkey); - } - else { + } else { alist = noweight; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java b/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java index 27708dc..77d3970 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/FilteredEventClassifier.java @@ -18,16 +18,16 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.ArrayList; import java.util.HashMap; import java.util.Map; import java.util.Random; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * This operator takes in a stream of tuples * and randomly emits them based on the specified total_filter and pass_filter values. @@ -110,8 +110,7 @@ public class FilteredEventClassifier<T> extends BaseOperator T keyval; if (hasvalues) { keyval = keys.get(key); - } - else { // pass on the value from incoming tuple + } else { // pass on the value from incoming tuple keyval = e.getValue(); } otuple.put(key + "," + inkey, keyval); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java index 63cbaf5..389ecb8 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/HashTestSink.java @@ -18,11 +18,12 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.api.Sink; - import java.util.HashMap; + import org.apache.commons.lang.mutable.MutableInt; +import com.datatorrent.api.Sink; + /** * A sink implementation, which counts the number of times each tuples is collected and stores the results in a hash map. * <p></p> @@ -54,8 +55,7 @@ public class HashTestSink<T> implements Sink<T> { int ret = -1; MutableInt val = map.get(key); - if (val != null) - { + if (val != null) { ret = val.intValue(); } return ret; @@ -64,13 +64,13 @@ public class HashTestSink<T> implements Sink<T> @Override public void put(T tuple) { - this.count++; - MutableInt val = map.get(tuple); - if (val == null) { - val = new MutableInt(0); - map.put(tuple, val); - } - val.increment(); + this.count++; + MutableInt val = map.get(tuple); + if (val == null) { + val = new MutableInt(0); + map.put(tuple, val); + } + val.increment(); } @Override http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java b/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java index f24c6e4..08b190e 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/HttpStatusFilter.java @@ -18,14 +18,13 @@ */ package com.datatorrent.lib.testbench; - import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * This operator consumes tuples which are key value pairs of strings. @@ -39,62 +38,64 @@ import com.datatorrent.api.Context.OperatorContext; */ public class HttpStatusFilter extends BaseOperator { - private String filterStatus; - private Map<String, Integer> collect; - public final transient DefaultInputPort<Map<String, String>> inport = new DefaultInputPort<Map<String, String>>() { + private String filterStatus; + private Map<String, Integer> collect; + public final transient DefaultInputPort<Map<String, String>> inport = new DefaultInputPort<Map<String, String>>() + { @Override - public void process(Map<String, String> s) { - for(Map.Entry<String, String> entry : s.entrySet()) - { - if (!entry.getValue().equals(filterStatus)) continue; - if (collect.containsKey(entry.getKey())) - { - Integer value = (Integer)collect.remove(entry.getKey()); - collect.put(entry.getKey(), new Integer(value+1)); - } else { - collect.put(entry.getKey(), new Integer(1)); - } - } + public void process(Map<String, String> s) + { + for (Map.Entry<String, String> entry : s.entrySet()) { + if (!entry.getValue().equals(filterStatus)) { + continue; + } + if (collect.containsKey(entry.getKey())) { + Integer value = (Integer)collect.remove(entry.getKey()); + collect.put(entry.getKey(), new Integer(value + 1)); + } else { + collect.put(entry.getKey(), new Integer(1)); + } + } } - }; + }; - @Override - public void setup(OperatorContext context) - { - collect = new HashMap<String, Integer>(); - } + @Override + public void setup(OperatorContext context) + { + collect = new HashMap<String, Integer>(); + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } - @Override - public void beginWindow(long windowId) - { - collect = new HashMap<String, Integer>(); - } + @Override + public void beginWindow(long windowId) + { + collect = new HashMap<String, Integer>(); + } - // out port - public final transient DefaultOutputPort<Map<String, Integer>> outport = new DefaultOutputPort<Map<String, Integer>>(); + // out port + public final transient DefaultOutputPort<Map<String, Integer>> outport = new DefaultOutputPort<Map<String, Integer>>(); - @Override - public void endWindow() - { - outport.emit(collect); - } + @Override + public void endWindow() + { + outport.emit(collect); + } - public String getFilterStatus() - { - return filterStatus; - } + public String getFilterStatus() + { + return filterStatus; + } - /** - * Only key with the following value is counted. - * @param filterStatus - */ - public void setFilterStatus(String filterStatus) - { - this.filterStatus = filterStatus; - } + /** + * Only key with the following value is counted. + * @param filterStatus + */ + public void setFilterStatus(String filterStatus) + { + this.filterStatus = filterStatus; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java b/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java index 0a26961..a6004dc 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/KeyValSum.java @@ -21,10 +21,10 @@ package com.datatorrent.lib.testbench; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * This operator consumes maps whose keys are strings and values are integers. @@ -37,51 +37,51 @@ import com.datatorrent.api.Context.OperatorContext; */ public class KeyValSum extends BaseOperator { - private Map<String, Integer> collect; + private Map<String, Integer> collect; /** * This input port on which tuples are received. */ - public final transient DefaultInputPort<Map<String, Integer>> inport = new DefaultInputPort<Map<String, Integer>>() { + public final transient DefaultInputPort<Map<String, Integer>> inport = new DefaultInputPort<Map<String, Integer>>() + { @Override - public void process(Map<String, Integer> s) { - for(Map.Entry<String, Integer> entry : s.entrySet()) - { - if (collect.containsKey(entry.getKey())) - { - Integer value = (Integer)collect.remove(entry.getKey()); - collect.put(entry.getKey(), value + entry.getValue()); - } else { - collect.put(entry.getKey(), entry.getValue()); - } - } + public void process(Map<String, Integer> s) + { + for (Map.Entry<String, Integer> entry : s.entrySet()) { + if (collect.containsKey(entry.getKey())) { + Integer value = (Integer)collect.remove(entry.getKey()); + collect.put(entry.getKey(), value + entry.getValue()); + } else { + collect.put(entry.getKey(), entry.getValue()); + } + } } - }; + }; - @Override - public void setup(OperatorContext context) - { - } + @Override + public void setup(OperatorContext context) + { + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } - @Override - public void beginWindow(long windowId) - { - collect = new HashMap<String, Integer>(); - } + @Override + public void beginWindow(long windowId) + { + collect = new HashMap<String, Integer>(); + } - /** + /** * The output port on which sums are emitted. */ - public final transient DefaultOutputPort<Map<String, Integer>> outport = new DefaultOutputPort<Map<String, Integer>>(); + public final transient DefaultOutputPort<Map<String, Integer>> outport = new DefaultOutputPort<Map<String, Integer>>(); - @Override - public void endWindow() - { - outport.emit(collect); - } + @Override + public void endWindow() + { + outport.emit(collect); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java b/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java index f8c0b51..55b5c04 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/RandomEventGenerator.java @@ -18,14 +18,15 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.api.Context.OperatorContext; - import java.util.Random; + import javax.validation.constraints.Min; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; + /** * Generates synthetic load. Creates tuples using random numbers and keeps emitting them on the output port string_data and integer_data. * <p> @@ -117,7 +118,8 @@ public class RandomEventGenerator extends BaseOperator implements InputOperator tuplesBlast = i; } - public void setTuplesBlastIntervalMillis(int tuplesBlastIntervalMillis) { + public void setTuplesBlastIntervalMillis(int tuplesBlastIntervalMillis) + { this.tuplesBlastIntervalMillis = tuplesBlastIntervalMillis; } @@ -172,6 +174,7 @@ public class RandomEventGenerator extends BaseOperator implements InputOperator try { Thread.sleep(tuplesBlastIntervalMillis); } catch (InterruptedException e) { + //fixme } } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java b/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java index e88f06f..6d73cab 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/RandomWordGenerator.java @@ -18,11 +18,13 @@ */ package com.datatorrent.lib.testbench; +import java.util.Random; + +import javax.validation.constraints.Min; + import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.InputOperator; -import java.util.Random; -import javax.validation.constraints.Min; /** * This is an input operator which generates random tuples that are an array of bytes. @@ -87,10 +89,7 @@ public class RandomWordGenerator implements InputOperator @Override public void emitTuples() { - for(; - tupleCounter < tuplesPerWindow; - tupleCounter++) - { + for (; tupleCounter < tuplesPerWindow; tupleCounter++) { byte[] bytes = new byte[tupleSize]; random.nextBytes(bytes); output.emit(bytes); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java b/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java index 1e03b29..8c5a24e 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/RedisSumOper.java @@ -22,10 +22,10 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.common.util.BaseOperator; /** * This operator collects integer tuples, then emits their sum at the end of the window. @@ -37,46 +37,50 @@ import com.datatorrent.api.Context.OperatorContext; */ public class RedisSumOper extends BaseOperator { - private ArrayList<Integer> collect; + private ArrayList<Integer> collect; /** * This is the input port which receives integer tuples to be summed. */ - public final transient DefaultInputPort<Integer> inport = new DefaultInputPort<Integer>() { - @Override - public void process(Integer s) { - collect.add(s); - } - }; + public final transient DefaultInputPort<Integer> inport = new DefaultInputPort<Integer>() + { + @Override + public void process(Integer s) + { + collect.add(s); + } + }; - @Override - public void setup(OperatorContext context) - { - } + @Override + public void setup(OperatorContext context) + { + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } - @Override - public void beginWindow(long windowId) - { - collect = new ArrayList<Integer>(); - } + @Override + public void beginWindow(long windowId) + { + collect = new ArrayList<Integer>(); + } - /** + /** * This is the output port which emits the summed tuples. */ - public final transient DefaultOutputPort<Map<Integer, Integer>> outport = new DefaultOutputPort<Map<Integer, Integer>>(); + public final transient DefaultOutputPort<Map<Integer, Integer>> outport = new DefaultOutputPort<Map<Integer, Integer>>(); - @Override - public void endWindow() - { - Integer sum = 0; - for(Integer entry : collect) sum += entry; - Map<Integer, Integer> tuple = new HashMap<Integer, Integer>(); - tuple.put(1, sum); - outport.emit(tuple); - } + @Override + public void endWindow() + { + Integer sum = 0; + for (Integer entry : collect) { + sum += entry; + } + Map<Integer, Integer> tuple = new HashMap<Integer, Integer>(); + tuple.put(1, sum); + outport.emit(tuple); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java index 20fefc8..efbdca8 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventClassifier.java @@ -18,14 +18,15 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.HashMap; + import javax.validation.constraints.NotNull; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * This operator receives data on two input ports (data1, and data2). * Each incoming tuple is given a seed value @@ -129,6 +130,7 @@ public class SeedEventClassifier<T> extends BaseOperator seed = s_start; } } + /** * Data for classification values */ http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java index f1541a5..3d02051 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/SeedEventGenerator.java @@ -18,16 +18,18 @@ */ package com.datatorrent.lib.testbench; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.InputOperator; -import com.datatorrent.lib.util.KeyValPair; import java.util.ArrayList; import java.util.HashMap; import java.util.Random; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.InputOperator; +import com.datatorrent.common.util.BaseOperator; +import com.datatorrent.lib.util.KeyValPair; + /** * Generates a one time seed load based on the range provided by the keys, * and adds new classification to incoming keys. @@ -135,8 +137,7 @@ public class SeedEventGenerator extends BaseOperator implements InputOperator for (int i = lstart; i < lend; i++) { emitTuple(i); } - } - else { + } else { for (int i = lstart; i > lend; i--) { emitTuple(i); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java b/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java index c10c784..03f0c42 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/SumTestSink.java @@ -45,7 +45,7 @@ public class SumTestSink<T> implements Sink<T> public void put(T payload) { if (payload instanceof Number) { - val += ((Number) payload).doubleValue(); + val += ((Number)payload).doubleValue(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java b/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java index c645619..72a0b1f 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/ThroughputCounter.java @@ -18,18 +18,19 @@ */ package com.datatorrent.lib.testbench; - -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; - import java.util.HashMap; import java.util.Map; + import javax.validation.constraints.Min; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; + /** * This operator expects incoming tuples to be of type HashMap<String, Integer>. * These values are throughput per window from upstream operators. @@ -156,8 +157,7 @@ public class ThroughputCounter<K, V extends Number> extends BaseOperator long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as elapsedTime is in millis if (rolling_window_count == 1) { average = tuples_per_sec; - } - else { // use tuple_numbers + } else { // use tuple_numbers long slots; if (count_denominator == rolling_window_count) { tuple_numbers[tuple_index] = tuple_count; @@ -167,8 +167,7 @@ public class ThroughputCounter<K, V extends Number> extends BaseOperator if (tuple_index == rolling_window_count) { tuple_index = 0; } - } - else { + } else { tuple_numbers[count_denominator - 1] = tuple_count; time_numbers[count_denominator - 1] = elapsedTime; slots = count_denominator; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java b/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java index 24e742b..3f3da57 100644 --- a/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java +++ b/library/src/main/java/com/datatorrent/lib/testbench/TopOccurrence.java @@ -23,9 +23,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; /** * This operator consumes key value pairs of strings and integers. @@ -38,110 +38,106 @@ import com.datatorrent.api.DefaultOutputPort; */ public class TopOccurrence extends BaseOperator { - // n value - private int n = 5; - private int threshold = 5; + // n value + private int n = 5; + private int threshold = 5; /** * */ - public final transient DefaultOutputPort<Map<Integer, String>> outport = new DefaultOutputPort<Map<Integer, String>>(); - /** + public final transient DefaultOutputPort<Map<Integer, String>> outport = new DefaultOutputPort<>(); + /** * */ - public final transient DefaultOutputPort<Map<Integer, String>> gtThreshold = new DefaultOutputPort<Map<Integer, String>>(); + public final transient DefaultOutputPort<Map<Integer, String>> gtThreshold = new DefaultOutputPort<>(); - // input port - public final transient DefaultInputPort<Map<String, Integer>> inport = - new DefaultInputPort<Map<String, Integer>>() { + // input port + public final transient DefaultInputPort<Map<String, Integer>> inport = new DefaultInputPort<Map<String, Integer>>() + { @Override public void process(Map<String, Integer> tuple) { int numOuts = 0; - if (tuple.size() < n) - { - for (Map.Entry<String, Integer> entry : tuple.entrySet()) - { - Map<Integer, String> out = new HashMap<Integer, String>(); - String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); - out.put(numOuts++, value); - outport.emit(out); - } - while(numOuts < n) - { - Map<Integer, String> out = new HashMap<Integer, String>(); - out.put(numOuts++, ""); - outport.emit(out); - } + if (tuple.size() < n) { + for (Map.Entry<String, Integer> entry : tuple.entrySet()) { + Map<Integer, String> out = new HashMap<Integer, String>(); + String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); + out.put(numOuts++, value); + outport.emit(out); + } + while (numOuts < n) { + Map<Integer, String> out = new HashMap<Integer, String>(); + out.put(numOuts++, ""); + outport.emit(out); + } } else { - ArrayList<Integer> values = new ArrayList<Integer>(); - for (Map.Entry<String, Integer> entry : tuple.entrySet()) - { - values.add(entry.getValue()); - } - Collections.sort(values); - for (int i=values.size()-1; i >= 0; i--) - { - for (Map.Entry<String, Integer> entry : tuple.entrySet()) - { - if (entry.getValue() == values.get(i)) - { - Map<Integer, String> out = new HashMap<Integer, String>(); - String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); - out.put(numOuts++, value); - outport.emit(out); - } - if (numOuts >= n) break; - } - if (numOuts >= n) break; - } + ArrayList<Integer> values = new ArrayList<Integer>(); + for (Map.Entry<String, Integer> entry : tuple.entrySet()) { + values.add(entry.getValue()); + } + Collections.sort(values); + + for (int i = values.size() - 1; i >= 0; i--) { + for (Map.Entry<String, Integer> entry : tuple.entrySet()) { + if (entry.getValue() == values.get(i)) { + Map<Integer, String> out = new HashMap<Integer, String>(); + String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); + out.put(numOuts++, value); + outport.emit(out); + } + if (numOuts >= n) { + break; + } + } + if (numOuts >= n) { + break; + } + } } // output greater than threshold numOuts = 1; - for (Map.Entry<String, Integer> entry : tuple.entrySet()) - { - if (entry.getValue() > threshold) - { - Map<Integer, String> out = new HashMap<Integer, String>(); - String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); - out.put(numOuts++, value); - gtThreshold.emit(out); - } + for (Map.Entry<String, Integer> entry : tuple.entrySet()) { + if (entry.getValue() > threshold) { + Map<Integer, String> out = new HashMap<Integer, String>(); + String value = new StringBuilder(entry.getKey()).append("##").append(entry.getValue()).toString(); + out.put(numOuts++, value); + gtThreshold.emit(out); + } } Map<Integer, String> out = new HashMap<Integer, String>(); - out.put(0, new Integer(numOuts).toString()); - gtThreshold.emit(out); - } - }; + out.put(0, new Integer(numOuts).toString()); + gtThreshold.emit(out); + } + }; - public int getN() - { - return n; - } + public int getN() + { + return n; + } - /** - * Output n top values - * @param n - */ - public void setN(int n) - { - this.n = n; - } + /** + * Output n top values + * @param n + */ + public void setN(int n) + { + this.n = n; + } - public int getThreshold() - { - return threshold; - } + public int getThreshold() + { + return threshold; + } - /** - * Emit the tuples only if it's value is greater than the threshold. - * @param threshold - */ - public void setThreshold(int threshold) - { - this.threshold = threshold; - } + /** + * Emit the tuples only if it's value is greater than the threshold. + * @param threshold + */ + public void setThreshold(int threshold) + { + this.threshold = threshold; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java index a40bb97..309560b 100644 --- a/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java +++ b/library/src/main/java/com/datatorrent/lib/transform/TransformOperator.java @@ -89,7 +89,8 @@ public class TransformOperator extends BaseOperator implements Operator.Activati @OutputPortFieldAnnotation(schemaRequired = true) public final transient DefaultOutputPort<Object> output = new DefaultOutputPort<Object>() { - @Override public void setup(Context.PortContext context) + @Override + public void setup(Context.PortContext context) { outputClass = context.getValue(Context.PortContext.TUPLE_CLASS); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java index a9e6d29..5a4d721 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKey.java @@ -53,25 +53,28 @@ public abstract class AbstractBaseFrequentKey<K> extends BaseKeyOperator<K> } count.increment(); } + protected HashMap<K, MutableInt> keycount = new HashMap<K, MutableInt>(); /** * override emitTuple to decide the port to emit to * @param tuple */ - abstract public void emitTuple(HashMap<K,Integer> tuple); + public abstract void emitTuple(HashMap<K,Integer> tuple); + /** * Overide emitList to specify the emit schema * @param tlist */ - abstract public void emitList(ArrayList<HashMap<K, Integer>> tlist); + public abstract void emitList(ArrayList<HashMap<K, Integer>> tlist); + /** * Override compareCount to decide most vs least * @param val1 * @param val2 * @return result of compareCount to be done by subclass */ - abstract public boolean compareCount(int val1, int val2); + public abstract boolean compareCount(int val1, int val2); /** * Emits the result. @@ -88,14 +91,12 @@ public abstract class AbstractBaseFrequentKey<K> extends BaseKeyOperator<K> key = e.getKey(); kval = e.getValue().intValue(); map.put(key, null); - } - else if (compareCount(e.getValue().intValue(), kval)) { + } else if (compareCount(e.getValue().intValue(), kval)) { key = e.getKey(); kval = e.getValue().intValue(); map.clear(); map.put(key, null); - } - else if (e.getValue().intValue() == kval) { + } else if (e.getValue().intValue() == kval) { map.put(e.getKey(), null); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java index 0b2c360..f96b792 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseFrequentKeyValueMap.java @@ -18,12 +18,13 @@ */ package com.datatorrent.lib.util; -import com.datatorrent.api.DefaultInputPort; - import java.util.HashMap; import java.util.Map; + import org.apache.commons.lang.mutable.MutableInt; +import com.datatorrent.api.DefaultInputPort; + /** * This is the base implementation of an operator, which takes key value pairs as inputs. * It counts the number of times each key value pair occurs. @@ -104,14 +105,12 @@ public abstract class AbstractBaseFrequentKeyValueMap<K, V> extends BaseKeyValue val = v.getKey(); kval = v.getValue().intValue(); vmap.put(val, null); - } - else if (compareValue(v.getValue().intValue(), kval)) { + } else if (compareValue(v.getValue().intValue(), kval)) { val = v.getKey(); kval = v.getValue().intValue(); vmap.clear(); vmap.put(val, null); - } - else if (v.getValue().intValue() == kval) { + } else if (v.getValue().intValue() == kval) { vmap.put(v.getKey(), null); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java index b7f4d97..f7d78db 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseMatchOperator.java @@ -18,11 +18,11 @@ */ package com.datatorrent.lib.util; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.util.BaseKeyValueOperator; import javax.validation.constraints.NotNull; import javax.validation.constraints.Pattern; +import com.datatorrent.api.Context.OperatorContext; + /** * This is the base implementation of operators which perform comparisons. * A concrete operator should be created from this skeleton implementation. @@ -60,7 +60,8 @@ public abstract class AbstractBaseMatchOperator<K,V extends Comparable> extends public enum supported_type { LTE, LT, EQ, NEQ, GT, GTE - }; + } + supported_type type = supported_type.EQ; /** @@ -166,23 +167,17 @@ public abstract class AbstractBaseMatchOperator<K,V extends Comparable> extends { if (cmp.equals("lt")) { setTypeLT(); - } - else if (cmp.equals("lte")) { + } else if (cmp.equals("lte")) { setTypeLTE(); - } - else if (cmp.equals("eq")) { + } else if (cmp.equals("eq")) { setTypeEQ(); - } - else if (cmp.equals("ne")) { + } else if (cmp.equals("ne")) { setTypeEQ(); - } - else if (cmp.equals("gt")) { + } else if (cmp.equals("gt")) { setTypeGT(); - } - else if (cmp.equals("gte")) { + } else if (cmp.equals("gte")) { setTypeGTE(); - } - else { + } else { setTypeEQ(); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java index ef4c9e4..47282b1 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNNonUniqueOperatorMap.java @@ -52,13 +52,13 @@ public abstract class AbstractBaseNNonUniqueOperatorMap<K, V> extends AbstractBa * Override to decide the direction (ascending vs descending) * @return true if ascending, to be done by sub-class */ - abstract public boolean isAscending(); + public abstract boolean isAscending(); /** * Override to decide which port to emit to and its schema * @param tuple */ - abstract public void emit(HashMap<K, ArrayList<V>> tuple); + public abstract void emit(HashMap<K, ArrayList<V>> tuple); /** * @@ -75,8 +75,7 @@ public abstract class AbstractBaseNNonUniqueOperatorMap<K, V> extends AbstractBa pqueue = new TopNSort<V>(5, n, isAscending()); kmap.put(cloneKey(e.getKey()), pqueue); pqueue.offer(cloneValue(e.getValue())); - } - else { + } else { pqueue.offer(e.getValue()); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java index 236a05c..4f6ab85 100644 --- a/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java +++ b/library/src/main/java/com/datatorrent/lib/util/AbstractBaseNOperatorMap.java @@ -52,7 +52,7 @@ import com.datatorrent.api.StreamCodec; * @tags rank, key value * @since 0.3.2 */ -abstract public class AbstractBaseNOperatorMap<K,V> extends BaseKeyValueOperator<K,V> +public abstract class AbstractBaseNOperatorMap<K,V> extends BaseKeyValueOperator<K,V> { /** * This is the input port that receives key value pairs. @@ -86,7 +86,7 @@ abstract public class AbstractBaseNOperatorMap<K,V> extends BaseKeyValueOperator * * @param tuple */ - abstract public void processTuple(Map<K,V> tuple); + public abstract void processTuple(Map<K,V> tuple); /** * Sets value of N (depth)
