http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java b/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java index a312962..7aa86be 100644 --- a/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java +++ b/library/src/main/java/com/datatorrent/lib/statistics/WeightedMeanOperator.java @@ -82,7 +82,9 @@ public class WeightedMeanOperator<V extends Number> extends BaseNumberValueOper @Override public void process(V tuple) { - if (tuple.doubleValue() != 0.0) currentWeight = tuple.doubleValue(); + if (tuple.doubleValue() != 0.0) { + currentWeight = tuple.doubleValue(); + } } }; @@ -101,7 +103,7 @@ public class WeightedMeanOperator<V extends Number> extends BaseNumberValueOper public void endWindow() { if (weightedCount != 0.0) { - mean.emit(getAverage()); + mean.emit(getAverage()); } weightedSum = 0.0; weightedCount = 0.0; @@ -123,21 +125,21 @@ public class WeightedMeanOperator<V extends Number> extends BaseNumberValueOper val = num.doubleValue() / weightedCount; break; case INTEGER: - val = (int) (num.intValue() / weightedCount); + val = (int)(num.intValue() / weightedCount); break; case FLOAT: val = new Float(num.floatValue() / weightedCount); break; case LONG: - val = (long) (num.longValue() / weightedCount); + val = (long)(num.longValue() / weightedCount); break; case SHORT: - val = (short) (num.shortValue() / weightedCount); + val = (short)(num.shortValue() / weightedCount); break; default: val = num.doubleValue() / weightedCount; break; } - return (V) val; + return (V)val; } }
http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java b/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java index 9a8daa3..abb6be5 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java +++ b/library/src/main/java/com/datatorrent/lib/stream/AbstractAggregator.java @@ -48,97 +48,97 @@ import com.datatorrent.api.annotation.OperatorAnnotation; @OperatorAnnotation(partitionable = false) public abstract class AbstractAggregator<T> implements Operator { - /** - * collection of input values. - */ - protected Collection<T> collection; - @Min(0) - /** - * size of output collection, all tuples till end window if set to 0. - */ - private int size = 0; + /** + * collection of input values. + */ + protected Collection<T> collection; + @Min(0) + /** + * size of output collection, all tuples till end window if set to 0. + */ + private int size = 0; - /** - * Input port that takes data to be added to a collection. - */ - public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() - { - @Override - public void process(T tuple) - { - if (collection == null) { - collection = getNewCollection(size); - } - collection.add(tuple); - if (collection.size() == size) { - output.emit(collection); - collection = null; - } - } + /** + * Input port that takes data to be added to a collection. + */ + public final transient DefaultInputPort<T> input = new DefaultInputPort<T>() + { + @Override + public void process(T tuple) + { + if (collection == null) { + collection = getNewCollection(size); + } + collection.add(tuple); + if (collection.size() == size) { + output.emit(collection); + collection = null; + } + } - }; + }; - /** - * Output port that emits a collection. - */ - public final transient DefaultOutputPort<Collection<T>> output = new DefaultOutputPort<Collection<T>>(); + /** + * Output port that emits a collection. + */ + public final transient DefaultOutputPort<Collection<T>> output = new DefaultOutputPort<Collection<T>>(); - /** - * Set the size of the collection. - * - * If set to zero, the collection collects all the tuples within a window and - * emits the collection as 1 output tuple at the end of the window. If set to - * positive value, it collects the collection as soon as the size of the - * collection reaches the size. - * - * @param size - * the size to set - */ - public void setSize(int size) - { - this.size = size; - } + /** + * Set the size of the collection. + * + * If set to zero, the collection collects all the tuples within a window and + * emits the collection as 1 output tuple at the end of the window. If set to + * positive value, it collects the collection as soon as the size of the + * collection reaches the size. + * + * @param size + * the size to set + */ + public void setSize(int size) + { + this.size = size; + } - /** - * Size of collection. - * - * @return size of collection - */ - @Min(0) - public int getSize() - { - return size; - } + /** + * Size of collection. + * + * @return size of collection + */ + @Min(0) + public int getSize() + { + return size; + } - /** - * Abstract method to get collection of given size. - * - * @param size - * @return collection - */ - public abstract Collection<T> getNewCollection(int size); + /** + * Abstract method to get collection of given size. + * + * @param size + * @return collection + */ + public abstract Collection<T> getNewCollection(int size); - @Override - public void beginWindow(long windowId) - { - } + @Override + public void beginWindow(long windowId) + { + } - @Override - public void endWindow() - { - if (size == 0 && collection != null) { - output.emit(collection); - collection = null; - } - } + @Override + public void endWindow() + { + if (size == 0 && collection != null) { + output.emit(collection); + collection = null; + } + } - @Override - public void setup(OperatorContext context) - { - } + @Override + public void setup(OperatorContext context) + { + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java b/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java index 106d16d..efd807a 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java +++ b/library/src/main/java/com/datatorrent/lib/stream/ArrayListToItem.java @@ -18,11 +18,12 @@ */ package com.datatorrent.lib.stream; +import java.util.ArrayList; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.util.BaseKeyOperator; -import java.util.ArrayList; /** * An implementation of BaseKeyOperator that breaks up an ArrayList tuple into Objects. @@ -42,25 +43,25 @@ import java.util.ArrayList; @Stateless public class ArrayListToItem<K> extends BaseKeyOperator<K> { - /** - * Input data port that takes an arraylist. - */ - public final transient DefaultInputPort<ArrayList<K>> data = new DefaultInputPort<ArrayList<K>>() - { - /** - * Emit one item at a time - */ - @Override - public void process(ArrayList<K> tuple) - { - for (K k : tuple) { - item.emit(cloneKey(k)); - } - } - }; + /** + * Input data port that takes an arraylist. + */ + public final transient DefaultInputPort<ArrayList<K>> data = new DefaultInputPort<ArrayList<K>>() + { + /** + * Emit one item at a time + */ + @Override + public void process(ArrayList<K> tuple) + { + for (K k : tuple) { + item.emit(cloneKey(k)); + } + } + }; - /** - * Output port that emits an array item. - */ - public final transient DefaultOutputPort<K> item = new DefaultOutputPort<K>(); + /** + * Output port that emits an array item. + */ + public final transient DefaultOutputPort<K> item = new DefaultOutputPort<K>(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java b/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java index 148f0b5..6874796 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java +++ b/library/src/main/java/com/datatorrent/lib/stream/ConsolidatorKeyVal.java @@ -50,137 +50,137 @@ import com.datatorrent.lib.util.KeyValPair; @OperatorAnnotation(partitionable = false) public class ConsolidatorKeyVal<K, V1, V2, V3, V4, V5> implements Operator { - /** - * key/array values output result. - */ - protected HashMap<K, ArrayList<Object>> result; - - @Override - public void setup(OperatorContext context) - { - } - - @Override - public void teardown() - { - } - - /** - * <p> - * Class operates on <K,V> pair, stores value in given number position in - * list. <br> - * - * @param <V> - * value type. - */ - public class ConsolidatorInputPort<V> extends - DefaultInputPort<KeyValPair<K, V>> - { - /** - * Value position in list. - */ - private int number; - - /** - * Constructor - * - * @param oper - * Connected operator. - * @param num - * Value position in list. - */ - ConsolidatorInputPort(Operator oper, int num) - { - super(); - number = num; - } - - /** - * Process key/value pair. - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - K key = tuple.getKey(); - ArrayList<Object> list = getObject(key); - list.set(number, tuple.getValue()); - } - - } - - /** - * V1 type value input port. - */ - public final transient ConsolidatorInputPort<V1> in1 = new ConsolidatorInputPort<V1>( - this, 0); - - /** - * V2 type value input port. - */ - public final transient ConsolidatorInputPort<V2> in2 = new ConsolidatorInputPort<V2>( - this, 1); - - /** - * V3 type value input port. - */ - @InputPortFieldAnnotation(optional = true) - public final transient ConsolidatorInputPort<V3> in3 = new ConsolidatorInputPort<V3>( - this, 2); - - /** - * V4 type value input port. - */ - @InputPortFieldAnnotation(optional = true) - public final transient ConsolidatorInputPort<V4> in4 = new ConsolidatorInputPort<V4>( - this, 3); - - /** - * V5 type value input port. - */ - @InputPortFieldAnnotation(optional = true) - public final transient ConsolidatorInputPort<V5> in5 = new ConsolidatorInputPort<V5>( - this, 4); - - /** - * Output port that emits a hashmap of <key,arraylist>. - */ - public final transient DefaultOutputPort<HashMap<K, ArrayList<Object>>> out = new DefaultOutputPort<HashMap<K, ArrayList<Object>>>(); - - /** - * Get array list object for given key - * - * @param k key - * @return array list for key. - */ - public ArrayList<Object> getObject(K k) - { - ArrayList<Object> val = result.get(k); - if (val == null) { - val = new ArrayList<Object>(5); - val.add(0, null); - val.add(1, null); - val.add(2, null); - val.add(3, null); - val.add(4, null); - result.put(k, val); - } - return val; - } - - @Override - public void beginWindow(long windowId) - { - result = new HashMap<K, ArrayList<Object>>(); - } - - /** - * Emits merged data - */ - @Override - public void endWindow() - { - if (!result.isEmpty()) { - out.emit(result); - } - } + /** + * key/array values output result. + */ + protected HashMap<K, ArrayList<Object>> result; + + @Override + public void setup(OperatorContext context) + { + } + + @Override + public void teardown() + { + } + + /** + * <p> + * Class operates on <K,V> pair, stores value in given number position in + * list. <br> + * + * @param <V> + * value type. + */ + public class ConsolidatorInputPort<V> extends + DefaultInputPort<KeyValPair<K, V>> + { + /** + * Value position in list. + */ + private int number; + + /** + * Constructor + * + * @param oper + * Connected operator. + * @param num + * Value position in list. + */ + ConsolidatorInputPort(Operator oper, int num) + { + super(); + number = num; + } + + /** + * Process key/value pair. + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + K key = tuple.getKey(); + ArrayList<Object> list = getObject(key); + list.set(number, tuple.getValue()); + } + + } + + /** + * V1 type value input port. + */ + public final transient ConsolidatorInputPort<V1> in1 = new ConsolidatorInputPort<V1>( + this, 0); + + /** + * V2 type value input port. + */ + public final transient ConsolidatorInputPort<V2> in2 = new ConsolidatorInputPort<V2>( + this, 1); + + /** + * V3 type value input port. + */ + @InputPortFieldAnnotation(optional = true) + public final transient ConsolidatorInputPort<V3> in3 = new ConsolidatorInputPort<V3>( + this, 2); + + /** + * V4 type value input port. + */ + @InputPortFieldAnnotation(optional = true) + public final transient ConsolidatorInputPort<V4> in4 = new ConsolidatorInputPort<V4>( + this, 3); + + /** + * V5 type value input port. + */ + @InputPortFieldAnnotation(optional = true) + public final transient ConsolidatorInputPort<V5> in5 = new ConsolidatorInputPort<V5>( + this, 4); + + /** + * Output port that emits a hashmap of <key,arraylist>. + */ + public final transient DefaultOutputPort<HashMap<K, ArrayList<Object>>> out = new DefaultOutputPort<HashMap<K, ArrayList<Object>>>(); + + /** + * Get array list object for given key + * + * @param k key + * @return array list for key. + */ + public ArrayList<Object> getObject(K k) + { + ArrayList<Object> val = result.get(k); + if (val == null) { + val = new ArrayList<Object>(5); + val.add(0, null); + val.add(1, null); + val.add(2, null); + val.add(3, null); + val.add(4, null); + result.put(k, val); + } + return val; + } + + @Override + public void beginWindow(long windowId) + { + result = new HashMap<K, ArrayList<Object>>(); + } + + /** + * Emits merged data + */ + @Override + public void endWindow() + { + if (!result.isEmpty()) { + out.emit(result); + } + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/Counter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/Counter.java b/library/src/main/java/com/datatorrent/lib/stream/Counter.java index 67aedb4..8de2653 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/Counter.java +++ b/library/src/main/java/com/datatorrent/lib/stream/Counter.java @@ -42,59 +42,59 @@ import com.datatorrent.api.Operator.Unifier; */ public class Counter implements Operator, Unifier<Integer> { - /** - * Input port that takes objects to be counted in each window. - */ - public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() - { - @Override - public void process(Object tuple) - { - count++; - } + /** + * Input port that takes objects to be counted in each window. + */ + public final transient DefaultInputPort<Object> input = new DefaultInputPort<Object>() + { + @Override + public void process(Object tuple) + { + count++; + } - }; + }; - /** - * Output port that takes emits count in each window. - */ - public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>() - { - @Override - public Unifier<Integer> getUnifier() - { - return Counter.this; - } + /** + * Output port that takes emits count in each window. + */ + public final transient DefaultOutputPort<Integer> output = new DefaultOutputPort<Integer>() + { + @Override + public Unifier<Integer> getUnifier() + { + return Counter.this; + } - }; + }; - @Override - public void beginWindow(long windowId) - { - count = 0; - } + @Override + public void beginWindow(long windowId) + { + count = 0; + } - @Override - public void process(Integer tuple) - { - count += tuple; - } + @Override + public void process(Integer tuple) + { + count += tuple; + } - @Override - public void endWindow() - { - output.emit(count); - } + @Override + public void endWindow() + { + output.emit(count); + } - @Override - public void setup(OperatorContext context) - { - } + @Override + public void setup(OperatorContext context) + { + } - @Override - public void teardown() - { - } + @Override + public void teardown() + { + } - private transient int count; + private transient int count; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/DevNull.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/DevNull.java b/library/src/main/java/com/datatorrent/lib/stream/DevNull.java index 8ced16b..877b562 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/DevNull.java +++ b/library/src/main/java/com/datatorrent/lib/stream/DevNull.java @@ -18,9 +18,9 @@ */ package com.datatorrent.lib.stream; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; /** * An implementation of BaseOperator that terminates a stream and does not affect the tuple. @@ -39,15 +39,15 @@ import com.datatorrent.api.annotation.Stateless; @Stateless public class DevNull<K> extends BaseOperator { - /** - * Input any data type port. - */ - public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() - { - @Override - public void process(K tuple) - { - // Does nothing; allows a stream to terminate and therefore be debugged - } - }; + /** + * Input any data type port. + */ + public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() + { + @Override + public void process(K tuple) + { + // Does nothing; allows a stream to terminate and therefore be debugged + } + }; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java b/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java index e8bea13..87093fe 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java +++ b/library/src/main/java/com/datatorrent/lib/stream/DevNullCounter.java @@ -18,14 +18,15 @@ */ package com.datatorrent.lib.stream; -import com.datatorrent.common.util.BaseOperator; -import com.datatorrent.api.DefaultInputPort; -import com.datatorrent.api.Context.OperatorContext; - import javax.validation.constraints.Min; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.api.DefaultInputPort; +import com.datatorrent.common.util.BaseOperator; + /** * An implementation of BaseOperator that is used for logging by counting the tuple and then drops it. * <p> @@ -48,141 +49,140 @@ import org.slf4j.LoggerFactory; public class DevNullCounter<K> extends BaseOperator { /** - * Input port that takes objects to be counted in each window. - */ - public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() - { - /** - * Process each tuple. Expects upstream node to compute number of tuples in - * that window and send it as an int<br> - * - * @param tuple - */ - @Override - public void process(K tuple) - { - tuple_count++; - } - }; - private static Logger log = LoggerFactory.getLogger(DevNullCounter.class); - private long windowStartTime = 0; - long[] tuple_numbers = null; - long[] time_numbers = null; - int tuple_index = 0; - int count_denominator = 1; - long count_windowid = 0; - long tuple_count = 1; // so that the first begin window starts the count down + * Input port that takes objects to be counted in each window. + */ + public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() + { + /** + * Process each tuple. Expects upstream node to compute number of tuples in + * that window and send it as an int<br> + * + * @param tuple + */ + @Override + public void process(K tuple) + { + tuple_count++; + } + }; + private static Logger log = LoggerFactory.getLogger(DevNullCounter.class); + private long windowStartTime = 0; + long[] tuple_numbers = null; + long[] time_numbers = null; + int tuple_index = 0; + int count_denominator = 1; + long count_windowid = 0; + long tuple_count = 1; // so that the first begin window starts the count down - private boolean debug = true; + private boolean debug = true; - /** - * getter function for debug state - * - * @return debug state - */ - public boolean getDebug() - { - return debug; - } + /** + * getter function for debug state + * + * @return debug state + */ + public boolean getDebug() + { + return debug; + } - /** - * setter function for debug state - * - * @param i - * sets debug to i - */ - public void setDebug(boolean i) - { - debug = i; - } + /** + * setter function for debug state + * + * @param i + * sets debug to i + */ + public void setDebug(boolean i) + { + debug = i; + } - @Min(1) - private int rollingwindowcount = 1; + @Min(1) + private int rollingwindowcount = 1; - public void setRollingwindowcount(int val) - { - rollingwindowcount = val; - } + public void setRollingwindowcount(int val) + { + rollingwindowcount = val; + } - /** - * Sets up all the config parameters. Assumes checking is done and has passed - * - * @param context - */ - @Override - public void setup(OperatorContext context) - { - windowStartTime = 0; - if (rollingwindowcount != 1) { // Initialized the tuple_numbers - tuple_numbers = new long[rollingwindowcount]; - time_numbers = new long[rollingwindowcount]; - for (int i = tuple_numbers.length; i > 0; i--) { - tuple_numbers[i - 1] = 0; - time_numbers[i - 1] = 0; - } - tuple_index = 0; - } - } + /** + * Sets up all the config parameters. Assumes checking is done and has passed + * + * @param context + */ + @Override + public void setup(OperatorContext context) + { + windowStartTime = 0; + if (rollingwindowcount != 1) { // Initialized the tuple_numbers + tuple_numbers = new long[rollingwindowcount]; + time_numbers = new long[rollingwindowcount]; + for (int i = tuple_numbers.length; i > 0; i--) { + tuple_numbers[i - 1] = 0; + time_numbers[i - 1] = 0; + } + tuple_index = 0; + } + } - @Override - public void beginWindow(long windowId) - { - if (tuple_count != 0) { // Do not restart time if no tuples were sent - windowStartTime = System.currentTimeMillis(); - tuple_count = 0; - } - } + @Override + public void beginWindow(long windowId) + { + if (tuple_count != 0) { // Do not restart time if no tuples were sent + windowStartTime = System.currentTimeMillis(); + tuple_count = 0; + } + } - /** - * convenient method for not sending more than configured number of windows. - */ - @Override - public void endWindow() - { - if (!debug) { - return; - } - if (tuple_count == 0) { - return; - } - long elapsedTime = System.currentTimeMillis() - windowStartTime; - if (elapsedTime == 0) { - elapsedTime = 1; // prevent from / zero - } + /** + * convenient method for not sending more than configured number of windows. + */ + @Override + public void endWindow() + { + if (!debug) { + return; + } + if (tuple_count == 0) { + return; + } + long elapsedTime = System.currentTimeMillis() - windowStartTime; + if (elapsedTime == 0) { + elapsedTime = 1; // prevent from / zero + } - long average; - long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as - // elapsedTime is - // in millis - if (rollingwindowcount == 1) { - average = tuples_per_sec; - } else { // use tuple_numbers - long slots; - if (count_denominator == rollingwindowcount) { - tuple_numbers[tuple_index] = tuple_count; - time_numbers[tuple_index] = elapsedTime; - slots = rollingwindowcount; - tuple_index++; - if (tuple_index == rollingwindowcount) { - tuple_index = 0; - } - } else { - tuple_numbers[count_denominator - 1] = tuple_count; - time_numbers[count_denominator - 1] = elapsedTime; - slots = count_denominator; - count_denominator++; - } - long time_slot = 0; - long numtuples = 0; - for (int i = 0; i < slots; i++) { - numtuples += tuple_numbers[i]; - time_slot += time_numbers[i]; - } - average = (numtuples * 1000) / time_slot; - } - log.debug(String - .format( - "\nWindowid (%d), Time (%d ms): The rate for %d tuples is %d. This window had %d tuples_per_sec ", - count_windowid++, elapsedTime, tuple_count, average, tuples_per_sec)); - } + long average; + long tuples_per_sec = (tuple_count * 1000) / elapsedTime; // * 1000 as + // elapsedTime is + // in millis + if (rollingwindowcount == 1) { + average = tuples_per_sec; + } else { // use tuple_numbers + long slots; + if (count_denominator == rollingwindowcount) { + tuple_numbers[tuple_index] = tuple_count; + time_numbers[tuple_index] = elapsedTime; + slots = rollingwindowcount; + tuple_index++; + if (tuple_index == rollingwindowcount) { + tuple_index = 0; + } + } else { + tuple_numbers[count_denominator - 1] = tuple_count; + time_numbers[count_denominator - 1] = elapsedTime; + slots = count_denominator; + count_denominator++; + } + long time_slot = 0; + long numtuples = 0; + for (int i = 0; i < slots; i++) { + numtuples += tuple_numbers[i]; + time_slot += time_numbers[i]; + } + average = (numtuples * 1000) / time_slot; + } + log.debug(String.format( + "\nWindowid (%d), Time (%d ms): The rate for %d tuples is %d. This window had %d tuples_per_sec ", + count_windowid++, elapsedTime, tuple_count, average, tuples_per_sec)); + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java b/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java index 03dba6d..29ce727 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java +++ b/library/src/main/java/com/datatorrent/lib/stream/HashMapToKeyValPair.java @@ -18,14 +18,15 @@ */ package com.datatorrent.lib.stream; +import java.util.HashMap; +import java.util.Map; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.util.BaseKeyValueOperator; import com.datatorrent.lib.util.KeyValPair; -import java.util.HashMap; -import java.util.Map; /** * An implementation of BaseKeyValueOperator that breaks a HashMap tuple into objects. @@ -52,47 +53,47 @@ import java.util.Map; @Stateless public class HashMapToKeyValPair<K, V> extends BaseKeyValueOperator<K, V> { - /** - * Input port that takes a hashmap of <key,value&rt;. - */ - public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() - { - /** - * Emits key, key/val pair, and val based on port connections - */ - @Override - public void process(HashMap<K, V> tuple) - { - for (Map.Entry<K, V> e : tuple.entrySet()) { - if (key.isConnected()) { - key.emit(cloneKey(e.getKey())); - } - if (val.isConnected()) { - val.emit(cloneValue(e.getValue())); - } - if (keyval.isConnected()) { - keyval.emit(new KeyValPair<K, V>(cloneKey(e.getKey()), cloneValue(e - .getValue()))); - } - } - } - }; + /** + * Input port that takes a hashmap of <key,value&rt;. + */ + public final transient DefaultInputPort<HashMap<K, V>> data = new DefaultInputPort<HashMap<K, V>>() + { + /** + * Emits key, key/val pair, and val based on port connections + */ + @Override + public void process(HashMap<K, V> tuple) + { + for (Map.Entry<K, V> e : tuple.entrySet()) { + if (key.isConnected()) { + key.emit(cloneKey(e.getKey())); + } + if (val.isConnected()) { + val.emit(cloneValue(e.getValue())); + } + if (keyval.isConnected()) { + keyval.emit(new KeyValPair<K, V>(cloneKey(e.getKey()), cloneValue(e + .getValue()))); + } + } + } + }; - /** - * Key output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<K> key = new DefaultOutputPort<K>(); + /** + * Key output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<K> key = new DefaultOutputPort<K>(); - /** - * key/value pair output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<KeyValPair<K, V>> keyval = new DefaultOutputPort<KeyValPair<K, V>>(); + /** + * key/value pair output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<KeyValPair<K, V>> keyval = new DefaultOutputPort<KeyValPair<K, V>>(); - /** - * Value output port. - */ - @OutputPortFieldAnnotation(optional = true) - public final transient DefaultOutputPort<V> val = new DefaultOutputPort<V>(); + /** + * Value output port. + */ + @OutputPortFieldAnnotation(optional = true) + public final transient DefaultOutputPort<V> val = new DefaultOutputPort<V>(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java b/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java index 73ece79..b33eadc 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java +++ b/library/src/main/java/com/datatorrent/lib/stream/JsonByteArrayOperator.java @@ -22,14 +22,14 @@ import java.util.HashMap; import java.util.Iterator; import java.util.Map; -import com.datatorrent.api.annotation.Stateless; import org.codehaus.jettison.json.JSONObject; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.netlet.util.DTThrowable; /** @@ -70,8 +70,7 @@ public class JsonByteArrayOperator extends BaseOperator JSONObject value = jSONObject.optJSONObject(key); if (value == null) { map.put(insertKey, jSONObject.get(key)); - } - else { + } else { getFlatMap(value, map, insertKey); } } @@ -105,8 +104,7 @@ public class JsonByteArrayOperator extends BaseOperator outputFlatMap.emit(flatMap); } - } - catch (Throwable ex) { + } catch (Throwable ex) { DTThrowable.rethrow(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java b/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java index 29574e6..dfa3ba2 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java +++ b/library/src/main/java/com/datatorrent/lib/stream/KeyValPairToHashMap.java @@ -18,12 +18,13 @@ */ package com.datatorrent.lib.stream; +import java.util.HashMap; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; import com.datatorrent.lib.util.BaseKeyValueOperator; import com.datatorrent.lib.util.KeyValPair; -import java.util.HashMap; /** * An implementation of BaseKeyValueOperator that converts Key Value Pair to a HashMap tuple. @@ -45,25 +46,25 @@ import java.util.HashMap; @Stateless public class KeyValPairToHashMap<K, V> extends BaseKeyValueOperator<K, V> { - /** - * Input port that takes a key value pair. - */ - public final transient DefaultInputPort<KeyValPair<K, V>> keyval = new DefaultInputPort<KeyValPair<K, V>>() - { - /** - * Emits key, key/val pair, and val based on port connections - */ - @Override - public void process(KeyValPair<K, V> tuple) - { - HashMap<K, V> otuple = new HashMap<K, V>(1); - otuple.put(tuple.getKey(), tuple.getValue()); - map.emit(otuple); - } - }; + /** + * Input port that takes a key value pair. + */ + public final transient DefaultInputPort<KeyValPair<K, V>> keyval = new DefaultInputPort<KeyValPair<K, V>>() + { + /** + * Emits key, key/val pair, and val based on port connections + */ + @Override + public void process(KeyValPair<K, V> tuple) + { + HashMap<K, V> otuple = new HashMap<K, V>(1); + otuple.put(tuple.getKey(), tuple.getValue()); + map.emit(otuple); + } + }; - /** - * key/value map output port. - */ - public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>(); + /** + * key/value map output port. + */ + public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>(); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java b/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java index cc47f5c..aee1213 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java +++ b/library/src/main/java/com/datatorrent/lib/stream/RoundRobinHashMap.java @@ -18,10 +18,11 @@ */ package com.datatorrent.lib.stream; +import java.util.HashMap; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.lib.util.BaseKeyValueOperator; -import java.util.HashMap; /** * <p> @@ -49,56 +50,56 @@ import java.util.HashMap; */ public class RoundRobinHashMap<K, V> extends BaseKeyValueOperator<K, V> { - /** - * Keys for round robin association. - */ - protected K[] keys; + /** + * Keys for round robin association. + */ + protected K[] keys; - /** - * Current key index. - */ - protected int cursor = 0; + /** + * Current key index. + */ + protected int cursor = 0; - private HashMap<K, V> otuple; + private HashMap<K, V> otuple; - /** - * Value input port. - */ - public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() - { - /** - * Emits key, key/val pair, and val based on port connections - */ - @Override - public void process(V tuple) - { - if (keys.length == 0) { - return; - } - if (cursor == 0) { - otuple = new HashMap<K, V>(); - } - otuple.put(keys[cursor], tuple); - if (++cursor >= keys.length) { - map.emit(otuple); - cursor = 0; - otuple = null; - } - } - }; + /** + * Value input port. + */ + public final transient DefaultInputPort<V> data = new DefaultInputPort<V>() + { + /** + * Emits key, key/val pair, and val based on port connections + */ + @Override + public void process(V tuple) + { + if (keys.length == 0) { + return; + } + if (cursor == 0) { + otuple = new HashMap<K, V>(); + } + otuple.put(keys[cursor], tuple); + if (++cursor >= keys.length) { + map.emit(otuple); + cursor = 0; + otuple = null; + } + } + }; - /** - * key/value map output port. - */ - public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>(); + /** + * key/value map output port. + */ + public final transient DefaultOutputPort<HashMap<K, V>> map = new DefaultOutputPort<HashMap<K, V>>(); - /** - * Keys for round robin asspociation, set by application. - * - * @param keys - */ - public void setKeys(K[] keys) - { - this.keys = keys; - } + /** + * Keys for round robin asspociation, set by application. + * + * @param keys + */ + public void setKeys(K[] keys) + { + this.keys = keys; + } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java b/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java index f13158e..f08b931 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java +++ b/library/src/main/java/com/datatorrent/lib/stream/StreamDuplicater.java @@ -43,9 +43,9 @@ import com.datatorrent.lib.util.BaseKeyOperator; @Stateless public class StreamDuplicater<K> extends BaseKeyOperator<K> { - /** - * Input data port. - */ + /** + * Input data port. + */ public final transient DefaultInputPort<K> data = new DefaultInputPort<K>() { /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java b/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java index 5d4cfa6..8678e9f 100644 --- a/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java +++ b/library/src/main/java/com/datatorrent/lib/stream/StreamMerger.java @@ -18,10 +18,10 @@ */ package com.datatorrent.lib.stream; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.Stateless; +import com.datatorrent.common.util.BaseOperator; /** * An implementation of BaseOperator that merges two streams with identical schema and emits the tuples to the output port in order. @@ -41,10 +41,10 @@ import com.datatorrent.api.annotation.Stateless; @Stateless public class StreamMerger<K> extends BaseOperator { - /** - * Data input port 1. - */ - public final transient DefaultInputPort<K> data1 = new DefaultInputPort<K>() + /** + * Data input port 1. + */ + public final transient DefaultInputPort<K> data1 = new DefaultInputPort<K>() { /** * Emits to port "out" http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java index aefd6cd..e3bba8a 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/AbstractSqlStreamOperator.java @@ -18,14 +18,14 @@ */ package com.datatorrent.lib.streamquery; -import com.datatorrent.common.util.BaseOperator; +import java.util.ArrayList; +import java.util.HashMap; + import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.InputPortFieldAnnotation; import com.datatorrent.api.annotation.OutputPortFieldAnnotation; - -import java.util.ArrayList; -import java.util.HashMap; +import com.datatorrent.common.util.BaseOperator; /** * A base implementation of a BaseOperator that is a sql stream operator. Subclasses should provide the @@ -58,7 +58,8 @@ public abstract class AbstractSqlStreamOperator extends BaseOperator */ public HashMap<String, ColumnInfo> columnInfoMap = new HashMap<String, ColumnInfo>(); - public InputSchema() { + public InputSchema() + { } public InputSchema(String name) http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java index 70e4333..77c7522 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/DeleteOperator.java @@ -20,9 +20,9 @@ package com.datatorrent.lib.streamquery; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.streamquery.condition.Condition; /** http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java index ac05444..2fe8bc3 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/DerbySqlStreamOperator.java @@ -18,15 +18,20 @@ */ package com.datatorrent.lib.streamquery; -import com.datatorrent.api.Context.OperatorContext; -import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; - -import java.sql.*; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; +import java.sql.SQLException; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import com.datatorrent.api.Context.OperatorContext; +import com.datatorrent.lib.streamquery.AbstractSqlStreamOperator.InputSchema.ColumnInfo; + /** * An implementation of AbstractSqlStreamOperator that provides embedded derby sql input operator. * <p> @@ -38,13 +43,14 @@ import java.util.Map; public class DerbySqlStreamOperator extends AbstractSqlStreamOperator { protected transient ArrayList<PreparedStatement> insertStatements = new ArrayList<PreparedStatement>(5); - protected List<String> execStmtStringList = new ArrayList<String>(); + protected List<String> execStmtStringList = new ArrayList<String>(); protected transient ArrayList<PreparedStatement> execStatements = new ArrayList<PreparedStatement>(5); protected transient ArrayList<PreparedStatement> deleteStatements = new ArrayList<PreparedStatement>(5); protected transient Connection db; - public void addExecStatementString(String stmt) { - this.execStmtStringList.add(stmt); + public void addExecStatementString(String stmt) + { + this.execStmtStringList.add(stmt); } @@ -54,8 +60,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator System.setProperty("derby.stream.error.file", "/dev/null"); try { Class.forName("org.apache.derby.jdbc.EmbeddedDriver").newInstance(); - } - catch (Exception ex) { + } catch (Exception ex) { throw new RuntimeException(ex); } @@ -74,7 +79,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator String columnNames = ""; String insertQuestionMarks = ""; int j = 0; - for (Map.Entry<String, ColumnInfo> entry: inputSchema.columnInfoMap.entrySet()) { + for (Map.Entry<String, ColumnInfo> entry : inputSchema.columnInfoMap.entrySet()) { if (!columnSpec.isEmpty()) { columnSpec += ","; columnNames += ","; @@ -87,21 +92,22 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator insertQuestionMarks += "?"; entry.getValue().bindIndex = ++j; } - String createTempTableStmt = "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED"; + String createTempTableStmt = + "DECLARE GLOBAL TEMPORARY TABLE SESSION." + inputSchema.name + "(" + columnSpec + ") NOT LOGGED"; st = db.prepareStatement(createTempTableStmt); st.execute(); st.close(); - String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES (" + insertQuestionMarks + ")"; + String insertStmt = "INSERT INTO SESSION." + inputSchema.name + " (" + columnNames + ") VALUES (" + + insertQuestionMarks + ")"; insertStatements.add(i, db.prepareStatement(insertStmt)); deleteStatements.add(i, db.prepareStatement("DELETE FROM SESSION." + inputSchema.name)); } - for (String stmtStr: execStmtStringList) { - execStatements.add(db.prepareStatement(stmtStr)); - } - } - catch (SQLException ex) { + for (String stmtStr : execStmtStringList) { + execStatements.add(db.prepareStatement(stmtStr)); + } + } catch (SQLException ex) { throw new RuntimeException(ex); } } @@ -111,8 +117,7 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator { try { db.setAutoCommit(false); - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } @@ -124,18 +129,16 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator PreparedStatement insertStatement = insertStatements.get(tableNum); try { - for (Map.Entry<String, Object> entry: tuple.entrySet()) { + for (Map.Entry<String, Object> entry : tuple.entrySet()) { ColumnInfo t = inputSchema.columnInfoMap.get(entry.getKey()); if (t != null && t.bindIndex != 0) { - //System.out.println("Binding: "+entry.getValue().toString()+" to "+t.bindIndex); insertStatement.setString(t.bindIndex, entry.getValue().toString()); } } insertStatement.executeUpdate(); insertStatement.clearParameters(); - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } @@ -147,48 +150,46 @@ public class DerbySqlStreamOperator extends AbstractSqlStreamOperator db.commit(); if (bindings != null) { for (int i = 0; i < bindings.size(); i++) { - for (PreparedStatement stmt: execStatements) { - stmt.setString(i, bindings.get(i).toString()); - } + for (PreparedStatement stmt : execStatements) { + stmt.setString(i, bindings.get(i).toString()); + } } } - - for (PreparedStatement stmt: execStatements) { - executePreparedStatement(stmt); + for (PreparedStatement stmt : execStatements) { + executePreparedStatement(stmt); } - for (PreparedStatement st: deleteStatements) { + for (PreparedStatement st : deleteStatements) { st.executeUpdate(); st.clearParameters(); } - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } bindings = null; } - private void executePreparedStatement(PreparedStatement statement) throws SQLException { - ResultSet res = statement.executeQuery(); - ResultSetMetaData resmeta = res.getMetaData(); - int columnCount = resmeta.getColumnCount(); - while (res.next()) { - HashMap<String, Object> resultRow = new HashMap<String, Object>(); - for (int i = 1; i <= columnCount; i++) { - resultRow.put(resmeta.getColumnName(i), res.getObject(i)); - } - this.result.emit(resultRow); - } - statement.clearParameters(); + private void executePreparedStatement(PreparedStatement statement) throws SQLException + { + ResultSet res = statement.executeQuery(); + ResultSetMetaData resmeta = res.getMetaData(); + int columnCount = resmeta.getColumnCount(); + while (res.next()) { + HashMap<String, Object> resultRow = new HashMap<String, Object>(); + for (int i = 1; i <= columnCount; i++) { + resultRow.put(resmeta.getColumnName(i), res.getObject(i)); + } + this.result.emit(resultRow); } + statement.clearParameters(); + } @Override public void teardown() { try { db.close(); - } - catch (SQLException ex) { + } catch (SQLException ex) { throw new RuntimeException(ex); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java index 63ad18a..1821953 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/GroupByHavingOperator.java @@ -24,10 +24,10 @@ import java.util.Map; import javax.validation.constraints.NotNull; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; import com.datatorrent.api.annotation.OperatorAnnotation; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.condition.HavingCondition; import com.datatorrent.lib.streamquery.function.FunctionIndex; @@ -100,14 +100,14 @@ public class GroupByHavingOperator extends BaseOperator { columnGroupIndexes.add(index); } + public void addHavingCondition(@NotNull HavingCondition condition) { havingConditions.add(condition); } /** - * @param set - * condition + * @param condition condition */ public void setCondition(Condition condition) { @@ -123,8 +123,9 @@ public class GroupByHavingOperator extends BaseOperator @Override public void process(Map<String, Object> tuple) { - if ((condition != null) && (!condition.isValidRow(tuple))) + if ((condition != null) && (!condition.isValidRow(tuple))) { return; + } rows.add(tuple); } }; @@ -193,8 +194,9 @@ public class GroupByHavingOperator extends BaseOperator return; } } - if (isValidHaving) + if (isValidHaving) { outport.emit(result); + } } rows = new ArrayList<Map<String, Object>>(); @@ -215,13 +217,13 @@ public class GroupByHavingOperator extends BaseOperator @Override public boolean equals(Object other) { - if (other instanceof MultiKeyCompare) - if (compareKeys.size() != ((MultiKeyCompare) other).compareKeys.size()) { + if (other instanceof MultiKeyCompare) { + if (compareKeys.size() != ((MultiKeyCompare)other).compareKeys.size()) { return false; } + } for (int i = 0; i < compareKeys.size(); i++) { - if (!(compareKeys.get(i).equals(((MultiKeyCompare) other).compareKeys - .get(i)))) { + if (!(compareKeys.get(i).equals(((MultiKeyCompare)other).compareKeys.get(i)))) { return false; } } @@ -241,8 +243,9 @@ public class GroupByHavingOperator extends BaseOperator @Override public int compareTo(Object other) { - if (this.equals(other)) + if (this.equals(other)) { return 0; + } return -1; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java index f5eafb4..883329e 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/InnerJoinOperator.java @@ -94,38 +94,37 @@ public class InnerJoinOperator implements Operator { table1.add(tuple); for (int j = 0; j < table2.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(tuple, table2.get(j)))) { + if ((joinCondition == null) || (joinCondition.isValidJoin(tuple, table2.get(j)))) { joinRows(tuple, table2.get(j)); } } - } - }; - - /** - * Input port 2 that takes a map of <string,object>. - */ - public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() { - @Override - public void process(Map<String, Object> tuple) - { - table2.add(tuple); + } + }; + + /** + * Input port 2 that takes a map of <string,object>. + */ + public final transient DefaultInputPort<Map<String, Object>> inport2 = new DefaultInputPort<Map<String, Object>>() + { + @Override + public void process(Map<String, Object> tuple) + { + table2.add(tuple); for (int j = 0; j < table1.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(table1.get(j), tuple))) { + if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), tuple))) { joinRows(table1.get(j), tuple); } } - } - }; + } + }; - /** - * Output port that emits a map of <string,object>. - */ - public final transient DefaultOutputPort<Map<String, Object>> outport = - new DefaultOutputPort<Map<String, Object>>(); + /** + * Output port that emits a map of <string,object>. + */ + public final transient DefaultOutputPort<Map<String, Object>> outport = + new DefaultOutputPort<Map<String, Object>>(); - @Override + @Override public void setup(OperatorContext arg0) { table1 = new ArrayList<Map<String, Object>>(); @@ -159,7 +158,7 @@ public class InnerJoinOperator implements Operator /** * Pick the supported condition. Currently only equal join is supported. - * @param set joinCondition + * @param joinCondition joinCondition */ public void setJoinCondition(Condition joinCondition) { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java index ebc5d23..18d9928 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByOperator.java @@ -21,9 +21,9 @@ package com.datatorrent.lib.streamquery; import java.util.ArrayList; import java.util.Map; +import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; -import com.datatorrent.api.Context.OperatorContext; import com.datatorrent.api.Operator; import com.datatorrent.api.Operator.Unifier; @@ -49,49 +49,49 @@ import com.datatorrent.api.Operator.Unifier; */ public class OrderByOperator implements Operator, Unifier<Map<String, Object>> { - /** - * Order by rules. - */ - ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>(); - - /** - * Descending flag. - */ - private boolean isDescending; - - /** - * collected rows. - */ - private ArrayList<Map<String, Object>> rows; - - /** - * Add order by rule. - */ - public void addOrderByRule(OrderByRule<?> rule) - { - orderByRules.add(rule); - } - - /** + /** + * Order by rules. + */ + ArrayList<OrderByRule<?>> orderByRules = new ArrayList<OrderByRule<?>>(); + + /** + * Descending flag. + */ + private boolean isDescending; + + /** + * collected rows. + */ + private ArrayList<Map<String, Object>> rows; + + /** + * Add order by rule. + */ + public void addOrderByRule(OrderByRule<?> rule) + { + orderByRules.add(rule); + } + + /** * @return isDescending */ public boolean isDescending() { - return isDescending; + return isDescending; } - /** - * @param set isDescending + /** + * @param isDescending isDescending */ public void setDescending(boolean isDescending) { - this.isDescending = isDescending; + this.isDescending = isDescending; } - @Override + @Override public void process(Map<String, Object> tuple) { - rows.add(tuple); + rows.add(tuple); } @Override @@ -103,13 +103,17 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>> @Override public void endWindow() { - for (int i=0; i < orderByRules.size(); i++) { + for (int i = 0; i < orderByRules.size(); i++) { rows = orderByRules.get(i).sort(rows); } if (isDescending) { - for (int i=0; i < rows.size(); i++) outport.emit(rows.get(i)); + for (int i = 0; i < rows.size(); i++) { + outport.emit(rows.get(i)); + } } else { - for (int i=rows.size()-1; i >= 0; i--) outport.emit(rows.get(i)); + for (int i = rows.size() - 1; i >= 0; i--) { + outport.emit(rows.get(i)); + } } } @@ -130,7 +134,8 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>> /** * Input port that takes a map of <string,object>. */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() { + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { @Override public void process(Map<String, Object> tuple) { @@ -141,18 +146,19 @@ public class OrderByOperator implements Operator, Unifier<Map<String, Object>> /** * Output port that emits a map of <string,object>. */ - public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>() - { - @Override - public Unifier<Map<String, Object>> getUnifier() { - OrderByOperator unifier = new OrderByOperator(); - for (int i=0; i < getOrderByRules().size(); i++) { - unifier.addOrderByRule(getOrderByRules().get(i)); - } - unifier.setDescending(isDescending); - return unifier; - } - }; + public final transient DefaultOutputPort<Map<String, Object>> outport = new DefaultOutputPort<Map<String, Object>>() + { + @Override + public Unifier<Map<String, Object>> getUnifier() + { + OrderByOperator unifier = new OrderByOperator(); + for (int i = 0; i < getOrderByRules().size(); i++) { + unifier.addOrderByRule(getOrderByRules().get(i)); + } + unifier.setDescending(isDescending); + return unifier; + } + }; /** * @return the orderByRules http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java index 0b16065..8573903 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/OrderByRule.java @@ -58,7 +58,7 @@ public class OrderByRule<T extends Comparable> for (int i = 0; i < rows.size(); i++) { Map<String, Object> row = rows.get(i); if (row.containsKey(columnName)) { - T value = (T) row.get(columnName); + T value = (T)row.get(columnName); ArrayList<Map<String, Object>> list; if (sorted.containsKey(value)) { list = sorted.get(value); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java index 1e1dcfb..0494bfb 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/OuterJoinOperator.java @@ -88,8 +88,7 @@ public class OuterJoinOperator extends InnerJoinOperator for (int i = 0; i < table2.size(); i++) { boolean merged = false; for (int j = 0; j < table1.size(); j++) { - if ((joinCondition == null) - || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { + if ((joinCondition == null) || (joinCondition.isValidJoin(table1.get(j), table2.get(i)))) { merged = true; } } @@ -104,6 +103,7 @@ public class OuterJoinOperator extends InnerJoinOperator { isLeftJoin = true; } + public void setRighttJoin() { isLeftJoin = false; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java index c1c411c..77616f3 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectFunctionOperator.java @@ -95,7 +95,9 @@ public class SelectFunctionOperator implements Operator @Override public void endWindow() { - if (functions.size() == 0) return; + if (functions.size() == 0) { + return; + } Map<String, Object> collect = new HashMap<String, Object>(); for (FunctionIndex function : functions) { try { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java index b02e40f..4dbc1f0 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectOperator.java @@ -22,9 +22,9 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.streamquery.condition.Condition; import com.datatorrent.lib.streamquery.index.Index; @@ -89,8 +89,9 @@ public class SelectOperator extends BaseOperator @Override public void process(Map<String, Object> tuple) { - if ((condition != null) && (!condition.isValidRow(tuple))) + if ((condition != null) && (!condition.isValidRow(tuple))) { return; + } if (indexes.size() == 0) { outport.emit(tuple); return; http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java index 365642f..c3ae083 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/SelectTopOperator.java @@ -58,7 +58,8 @@ public class SelectTopOperator implements Operator /** * Input port that takes a map of <string,object>. */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() { + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { @Override public void process(Map<String, Object> tuple) { @@ -89,13 +90,13 @@ public class SelectTopOperator implements Operator @Override public void endWindow() { - int numEmits = topValue; - if (isPercentage) { - numEmits = list.size() * (topValue/100); - } - for (int i=0; (i < numEmits)&&(i < list.size()); i++) { - outport.emit(list.get(i)); - } + int numEmits = topValue; + if (isPercentage) { + numEmits = list.size() * (topValue / 100); + } + for (int i = 0; (i < numEmits) && (i < list.size()); i++) { + outport.emit(list.get(i)); + } } public int getTopValue() http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java index e130515..6724a7e 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/UpdateOperator.java @@ -21,9 +21,9 @@ package com.datatorrent.lib.streamquery; import java.util.HashMap; import java.util.Map; -import com.datatorrent.common.util.BaseOperator; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; +import com.datatorrent.common.util.BaseOperator; import com.datatorrent.lib.streamquery.condition.Condition; /** @@ -54,33 +54,36 @@ public class UpdateOperator extends BaseOperator */ Map<String, Object> updates = new HashMap<String, Object>(); - /** - * condition. - */ - private Condition condition = null; + /** + * condition. + */ + private Condition condition = null; - /** - * set condition. - */ - public void setCondition(Condition condition) - { - this.condition = condition; - } + /** + * set condition. + */ + public void setCondition(Condition condition) + { + this.condition = condition; + } /** * Input port that takes a map of <string,object>. */ - public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() { + public final transient DefaultInputPort<Map<String, Object>> inport = new DefaultInputPort<Map<String, Object>>() + { @Override public void process(Map<String, Object> tuple) { - if ((condition != null)&&(!condition.isValidRow(tuple)))return; + if ((condition != null) && (!condition.isValidRow(tuple))) { + return; + } if (updates.size() == 0) { outport.emit(tuple); return; } Map<String, Object> result = new HashMap<String, Object>(); - for(Map.Entry<String, Object> entry : tuple.entrySet()) { + for (Map.Entry<String, Object> entry : tuple.entrySet()) { if (updates.containsKey(entry.getKey())) { result.put(entry.getKey(), updates.get(entry.getKey())); } else { http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java index efcb62c..43cdc72 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/BetweenCondition.java @@ -74,11 +74,19 @@ public class BetweenCondition extends Condition @Override public boolean isValidRow(@NotNull Map<String, Object> row) { - if (!row.containsKey(column)) return false; + if (!row.containsKey(column)) { + return false; + } Object value = row.get(column); - if (value == null) return false; - if (((Comparable)value).compareTo((Comparable)leftValue) < 0) return false; - if (((Comparable)value).compareTo((Comparable)rightValue) > 0) return false; + if (value == null) { + return false; + } + if (((Comparable)value).compareTo((Comparable)leftValue) < 0) { + return false; + } + if (((Comparable)value).compareTo((Comparable)rightValue) > 0) { + return false; + } return true; } @@ -88,7 +96,7 @@ public class BetweenCondition extends Condition @Override public boolean isValidJoin(@NotNull Map<String, Object> row1, Map<String, Object> row2) { - assert(false); + assert (false); return false; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java index 2caadc6..b4bd3ed 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/CompoundCondition.java @@ -59,10 +59,12 @@ public class CompoundCondition extends Condition /** * Constructor for logical or metric. + * * @param leftCondition Left validate row condition, must be non null. <br> - * @param rightCondition Right validate row condition, must be non null. <br> + * @param rightCondition Right validate row condition, must be non null. <br> */ - public CompoundCondition(Condition leftCondition, Condition rightCondition) { + public CompoundCondition(Condition leftCondition, Condition rightCondition) + { this.leftCondition = leftCondition; this.rightCondition = rightCondition; } @@ -70,11 +72,13 @@ public class CompoundCondition extends Condition /** * Constructor for logical and metric if logical and parameter is true. * <br> + * * @param leftCondition Left validate row condition, must be non null. <br> - * @param rightCondition Right validate row condition, must be non null. <br> - * @param isLogicalAnd Logical AND if true. + * @param rightCondition Right validate row condition, must be non null. <br> + * @param isLogicalAnd Logical AND if true. */ - public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd) { + public CompoundCondition(Condition leftCondition, Condition rightCondition, boolean isLogicalAnd) + { this.leftCondition = leftCondition; this.rightCondition = rightCondition; logicalOr = !isLogicalAnd; @@ -84,7 +88,7 @@ public class CompoundCondition extends Condition public boolean isValidRow(Map<String, Object> row) { if (logicalOr) { - return leftCondition.isValidRow(row) || rightCondition.isValidRow(row); + return leftCondition.isValidRow(row) || rightCondition.isValidRow(row); } else { return leftCondition.isValidRow(row) && rightCondition.isValidRow(row); } @@ -117,7 +121,8 @@ public class CompoundCondition extends Condition this.rightCondition = rightCondition; } - public void setLogicalAnd() { + public void setLogicalAnd() + { this.logicalOr = false; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java index c0a4fde..86d5581 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/Condition.java @@ -31,13 +31,14 @@ import javax.validation.constraints.NotNull; * @tags sql condition, filter * @since 0.3.3 */ -abstract public class Condition +public abstract class Condition { - /** - * Row containing column/value map. - * @return row validation status. - */ - abstract public boolean isValidRow(@NotNull Map<String, Object> row); + /** + * Row containing column/value map. + * + * @return row validation status. + */ + public abstract boolean isValidRow(@NotNull Map<String, Object> row); /** * Filter valid rows only. http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java index fbcb9b0..bb478cf 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/EqualValueCondition.java @@ -57,23 +57,28 @@ public class EqualValueCondition extends Condition public boolean isValidRow(Map<String, Object> row) { // no conditions - if (equalMap.size() == 0) + if (equalMap.size() == 0) { return true; + } // compare each condition value for (Map.Entry<String, Object> entry : equalMap.entrySet()) { - if (!row.containsKey(entry.getKey())) + if (!row.containsKey(entry.getKey())) { return false; + } Object value = row.get(entry.getKey()); if (entry.getValue() == null) { - if (value == null) + if (value == null) { return true; + } return false; } - if (value == null) + if (value == null) { return false; - if (!entry.getValue().equals(value)) + } + if (!entry.getValue().equals(value)) { return false; + } } return true; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java index b0a3127..7877053 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCompareValue.java @@ -70,8 +70,8 @@ public class HavingCompareValue<T extends Comparable> extends HavingCondition @Override public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception { - Object computed = aggregateIndex.compute(rows); - return (compareType == compareValue.compareTo(computed)); + Object computed = aggregateIndex.compute(rows); + return (compareType == compareValue.compareTo(computed)); } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java index 89451e2..6dac690 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/HavingCondition.java @@ -42,14 +42,15 @@ public abstract class HavingCondition protected FunctionIndex aggregateIndex = null; /** - * @param aggregateIndex Aggregate index to be validated. + * @param aggregateIndex Aggregate index to be validated. */ - public HavingCondition(FunctionIndex aggregateIndex) { + public HavingCondition(FunctionIndex aggregateIndex) + { this.aggregateIndex = aggregateIndex; } /** * Check if aggregate is valid. */ - abstract public boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; + public abstract boolean isValidAggregate(@NotNull ArrayList<Map<String, Object>> rows) throws Exception; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java index 0d5f5c2..236f3b1 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/InCondition.java @@ -50,16 +50,19 @@ public class InCondition extends Condition private Set<Object> inValues = new HashSet<Object>(); /** - * @param column Column name for which value is checked in values set. + * @param column Column name for which value is checked in values set. */ - public InCondition(@NotNull String column) { + public InCondition(@NotNull String column) + { this.column = column; } @Override public boolean isValidRow(@NotNull Map<String, Object> row) { - if (!row.containsKey(column)) return false; + if (!row.containsKey(column)) { + return false; + } return inValues.contains(row.get(column)); } @@ -79,7 +82,8 @@ public class InCondition extends Condition this.column = column; } - public void addInValue(Object value) { + public void addInValue(Object value) + { this.inValues.add(value); } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java index f3b829a..d350edc 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/JoinColumnEqualCondition.java @@ -48,7 +48,8 @@ public class JoinColumnEqualCondition extends Condition @NotNull private String column2; - public JoinColumnEqualCondition(@NotNull String column1,@NotNull String column2) { + public JoinColumnEqualCondition(@NotNull String column1, @NotNull String column2) + { this.column1 = column1; this.column2 = column2; } @@ -59,7 +60,7 @@ public class JoinColumnEqualCondition extends Condition @Override public boolean isValidRow(Map<String, Object> row) { - assert(false); + assert (false); return false; } @@ -69,7 +70,9 @@ public class JoinColumnEqualCondition extends Condition @Override public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) { - if (!row1.containsKey(column1) || !row2.containsKey(column2)) return false; + if (!row1.containsKey(column1) || !row2.containsKey(column2)) { + return false; + } Object value1 = row1.get(column1); Object value2 = row2.get(column2); return value1.equals(value2); http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java index f879cd6..b3d7174 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/condition/LikeCondition.java @@ -54,7 +54,8 @@ public class LikeCondition extends Condition * @param column Column to be matched with regular expression, must be non-null. * @param pattern Regular expression pattern, must be non-null. */ - public LikeCondition(@NotNull String column,@NotNull String pattern) { + public LikeCondition(@NotNull String column,@NotNull String pattern) + { setColumn(column); setPattern(pattern); } @@ -66,10 +67,11 @@ public class LikeCondition extends Condition @Override public boolean isValidRow(Map<String, Object> row) { - if (!row.containsKey(column)) return false; - Matcher match = pattern.matcher((CharSequence) row.get(column)); - if (!match.find()) return false; - return true; + if (!row.containsKey(column)) { + return false; + } + Matcher match = pattern.matcher((CharSequence)row.get(column)); + return match.find(); } /** @@ -78,7 +80,7 @@ public class LikeCondition extends Condition @Override public boolean isValidJoin(Map<String, Object> row1, Map<String, Object> row2) { - assert(false); + assert (false); return false; } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java index 43223d1..e212ff8 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/AverageFunction.java @@ -55,12 +55,14 @@ public class AverageFunction extends FunctionIndex @Override public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws Exception { - if (rows.size() == 0) return 0.0; + if (rows.size() == 0) { + return 0.0; + } double sum = 0.0; for (Map<String, Object> row : rows) { sum += ((Number)row.get(column)).doubleValue(); } - return sum/rows.size(); + return sum / rows.size(); } /** @@ -70,7 +72,9 @@ public class AverageFunction extends FunctionIndex @Override protected String aggregateName() { - if (!StringUtils.isEmpty(alias)) return alias; + if (!StringUtils.isEmpty(alias)) { + return alias; + } return "AVG(" + column + ")"; } } http://git-wip-us.apache.org/repos/asf/incubator-apex-malhar/blob/3735316e/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java index 350a56a..dafe54e 100644 --- a/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java +++ b/library/src/main/java/com/datatorrent/lib/streamquery/function/CountFunction.java @@ -57,10 +57,14 @@ public class CountFunction extends FunctionIndex @Override public Object compute(ArrayList<Map<String, Object>> rows) throws Exception { - if (column.equals("*")) return rows.size(); + if (column.equals("*")) { + return rows.size(); + } long count = 0; for (Map<String, Object> row : rows) { - if (row.containsKey(column) && (row.get(column) != null)) count++; + if (row.containsKey(column) && (row.get(column) != null)) { + count++; + } } return count; } @@ -72,7 +76,9 @@ public class CountFunction extends FunctionIndex @Override protected String aggregateName() { - if (!StringUtils.isEmpty(alias)) return alias; + if (!StringUtils.isEmpty(alias)) { + return alias; + } return "COUNT(" + column + ")"; }
