http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java index 3e3e739..05f7fb8 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/IllegalAggregateFieldTypeException.java @@ -16,13 +16,14 @@ */ package org.apache.eagle.query.aggregate; -public class IllegalAggregateFieldTypeException extends RuntimeException{ - static final long serialVersionUID = -4548788354899625887L; - public IllegalAggregateFieldTypeException(){ - super(); - } - - public IllegalAggregateFieldTypeException(String message){ - super(message + ", only count and sum are support"); - } +public class IllegalAggregateFieldTypeException extends RuntimeException { + static final long serialVersionUID = -4548788354899625887L; + + public IllegalAggregateFieldTypeException() { + super(); + } + + public IllegalAggregateFieldTypeException(String message) { + super(message + ", only count and sum are support"); + } }
http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java index b801255..d27e10e 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/PostAggregateSorting.java @@ -27,75 +27,79 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; public class PostAggregateSorting { - private static final Logger LOG = LoggerFactory.getLogger(PostAggregateSorting.class); - - private static SortedSet<Map.Entry<String, AggregateAPIEntity>> sortByValue(Map<String, AggregateAPIEntity> map, List<AggregateParams.SortFieldOrder> sortedFields) { - SortedSet<Map.Entry<String, AggregateAPIEntity>> sortedEntries = new TreeSet<Map.Entry<String, AggregateAPIEntity>>(new MapKeyValueComparator(sortedFields)); - sortedEntries.addAll(map.entrySet()); - return sortedEntries; - } + private static final Logger LOG = LoggerFactory.getLogger(PostAggregateSorting.class); - /** - * recursively populate sorted list from entity list - * @param entity - */ - public static void sort(AggregateAPIEntity entity, List<AggregateParams.SortFieldOrder> sortFieldOrders){ - // sort should internally add key field to AggregateAPIEntity before the sorting starts as "key" could be sorted against - Map<String, AggregateAPIEntity> children = entity.getEntityList(); - for(Map.Entry<String, AggregateAPIEntity> e : children.entrySet()){ - e.getValue().setKey(e.getKey()); - } - SortedSet<Map.Entry<String, AggregateAPIEntity>> set = sortByValue(children, sortFieldOrders); - for(Map.Entry<String, AggregateAPIEntity> entry : set){ - entity.getSortedList().add(entry.getValue()); - } - for(Map.Entry<String, AggregateAPIEntity> entry : entity.getEntityList().entrySet()){ - sort(entry.getValue(), sortFieldOrders); - } - entity.setEntityList(null); - } + private static SortedSet<Map.Entry<String, AggregateAPIEntity>> sortByValue(Map<String, AggregateAPIEntity> map, List<AggregateParams.SortFieldOrder> sortedFields) { + SortedSet<Map.Entry<String, AggregateAPIEntity>> sortedEntries = new TreeSet<Map.Entry<String, AggregateAPIEntity>>(new MapKeyValueComparator(sortedFields)); + sortedEntries.addAll(map.entrySet()); + return sortedEntries; + } - private static class MapKeyValueComparator implements Comparator<Map.Entry<String, AggregateAPIEntity>>{ - private List<AggregateParams.SortFieldOrder> sortedFieldOrders; - public MapKeyValueComparator(List<AggregateParams.SortFieldOrder> sortedFields){ - this.sortedFieldOrders = sortedFields; - } - @Override - public int compare(Map.Entry<String, AggregateAPIEntity> e1, Map.Entry<String, AggregateAPIEntity> e2){ - int r = 0; - AggregateAPIEntity entity1 = e1.getValue(); - AggregateAPIEntity entity2 = e2.getValue(); - for(AggregateParams.SortFieldOrder sortFieldOrder : sortedFieldOrders){ - // TODO count should not be literal, compare numTotalDescendants - if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)){ - long tmp = entity1.getNumTotalDescendants() - entity2.getNumTotalDescendants(); - r = (tmp == 0) ? 0 : ((tmp > 0) ? 1 : -1); - }else if(sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)){ - r = entity1.getKey().compareTo(entity2.getKey()); - }else{ - try{ - String sortedField = sortFieldOrder.getField(); - String tmp1 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1); - Method getMethod1 = entity1.getClass().getMethod("get"+tmp1); - Object r1 = getMethod1.invoke(entity1); - Long comp1 = (Long)r1; - String tmp2 = sortedField.substring(0, 1).toUpperCase()+sortedField.substring(1); - Method getMethod2 = entity2.getClass().getMethod("get"+tmp2); - Object r2 = getMethod2.invoke(entity2); - Long comp2 = (Long)r2; - r = comp1.compareTo(comp2); - }catch(Exception ex){ - LOG.error("Can not get corresponding field for sorting", ex); - r = 0; - } - } - if(r == 0) continue; - if(!sortFieldOrder.isAscendant()){ - r = -r; - } - return r; - } - return r; + /** + * recursively populate sorted list from entity list + * @param entity + */ + public static void sort(AggregateAPIEntity entity, List<AggregateParams.SortFieldOrder> sortFieldOrders) { + // sort should internally add key field to AggregateAPIEntity before the sorting starts as "key" could be sorted against + Map<String, AggregateAPIEntity> children = entity.getEntityList(); + for (Map.Entry<String, AggregateAPIEntity> e : children.entrySet()) { + e.getValue().setKey(e.getKey()); } - } + SortedSet<Map.Entry<String, AggregateAPIEntity>> set = sortByValue(children, sortFieldOrders); + for (Map.Entry<String, AggregateAPIEntity> entry : set) { + entity.getSortedList().add(entry.getValue()); + } + for (Map.Entry<String, AggregateAPIEntity> entry : entity.getEntityList().entrySet()) { + sort(entry.getValue(), sortFieldOrders); + } + entity.setEntityList(null); + } + + private static class MapKeyValueComparator implements Comparator<Map.Entry<String, AggregateAPIEntity>> { + private List<AggregateParams.SortFieldOrder> sortedFieldOrders; + + public MapKeyValueComparator(List<AggregateParams.SortFieldOrder> sortedFields) { + this.sortedFieldOrders = sortedFields; + } + + @Override + public int compare(Map.Entry<String, AggregateAPIEntity> e1, Map.Entry<String, AggregateAPIEntity> e2) { + int r = 0; + AggregateAPIEntity entity1 = e1.getValue(); + AggregateAPIEntity entity2 = e2.getValue(); + for (AggregateParams.SortFieldOrder sortFieldOrder : sortedFieldOrders) { + // TODO count should not be literal, compare numTotalDescendants + if (sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_COUNT)) { + long tmp = entity1.getNumTotalDescendants() - entity2.getNumTotalDescendants(); + r = (tmp == 0) ? 0 : ((tmp > 0) ? 1 : -1); + } else if (sortFieldOrder.getField().equals(AggregateParams.SortFieldOrder.SORT_BY_AGGREGATE_KEY)) { + r = entity1.getKey().compareTo(entity2.getKey()); + } else { + try { + String sortedField = sortFieldOrder.getField(); + String tmp1 = sortedField.substring(0, 1).toUpperCase() + sortedField.substring(1); + Method getMethod1 = entity1.getClass().getMethod("get" + tmp1); + Object r1 = getMethod1.invoke(entity1); + Long comp1 = (Long)r1; + String tmp2 = sortedField.substring(0, 1).toUpperCase() + sortedField.substring(1); + Method getMethod2 = entity2.getClass().getMethod("get" + tmp2); + Object r2 = getMethod2.invoke(entity2); + Long comp2 = (Long)r2; + r = comp1.compareTo(comp2); + } catch (Exception ex) { + LOG.error("Can not get corresponding field for sorting", ex); + r = 0; + } + } + if (r == 0) { + continue; + } + if (!sortFieldOrder.isAscendant()) { + r = -r; + } + return r; + } + return r; + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java index 6d47c7f..30a51d6 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderType.java @@ -20,40 +20,41 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; public enum SortFieldOrderType { - key("^(key)=(asc|desc)$"), - count("^(count)=(asc|desc)$"), - sum("^sum\\((.*)\\)=(asc|desc)$"), - avg("^avg\\((.*)\\)(asc|desc)$"), - max("^max\\((.*)\\)(asc|desc)$"), - min("^min\\((.*)\\)(asc|desc)$"); - - private Pattern pattern; - private SortFieldOrderType(String patternString){ - this.pattern = Pattern.compile(patternString); - } + key("^(key)=(asc|desc)$"), + count("^(count)=(asc|desc)$"), + sum("^sum\\((.*)\\)=(asc|desc)$"), + avg("^avg\\((.*)\\)(asc|desc)$"), + max("^max\\((.*)\\)(asc|desc)$"), + min("^min\\((.*)\\)(asc|desc)$"); - /** - * This method is thread safe - * match and retrieve back the aggregated fields, for count, aggregateFields can be null - * @param sortFieldOrder - * @return - */ - public SortFieldOrderTypeMatcher matcher(String sortFieldOrder){ - Matcher m = pattern.matcher(sortFieldOrder); - - if(m.find()){ - return new SortFieldOrderTypeMatcher(true, m.group(1), m.group(2)); - }else{ - return new SortFieldOrderTypeMatcher(false, null, null); - } - } - - public static AggregateParams.SortFieldOrder matchAll(String sortFieldOrder){ - for(SortFieldOrderType type : SortFieldOrderType.values()){ - SortFieldOrderTypeMatcher m = type.matcher(sortFieldOrder); - if(m.find()) - return m.sortFieldOrder(); - } - return null; - } + private Pattern pattern; + private SortFieldOrderType(String patternString) { + this.pattern = Pattern.compile(patternString); + } + + /** + * This method is thread safe + * match and retrieve back the aggregated fields, for count, aggregateFields can be null + * @param sortFieldOrder + * @return + */ + public SortFieldOrderTypeMatcher matcher(String sortFieldOrder) { + Matcher m = pattern.matcher(sortFieldOrder); + + if (m.find()) { + return new SortFieldOrderTypeMatcher(true, m.group(1), m.group(2)); + } else { + return new SortFieldOrderTypeMatcher(false, null, null); + } + } + + public static AggregateParams.SortFieldOrder matchAll(String sortFieldOrder) { + for (SortFieldOrderType type : SortFieldOrderType.values()) { + SortFieldOrderTypeMatcher m = type.matcher(sortFieldOrder); + if (m.find()) { + return m.sortFieldOrder(); + } + } + return null; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java index 0b4d408..8ef5c28 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/SortFieldOrderTypeMatcher.java @@ -18,21 +18,21 @@ package org.apache.eagle.query.aggregate; public class SortFieldOrderTypeMatcher { - private boolean matched; - private AggregateParams.SortFieldOrder sortFieldOrder; + private boolean matched; + private AggregateParams.SortFieldOrder sortFieldOrder; - public SortFieldOrderTypeMatcher(boolean matched, String field, String order){ - this.matched = matched; - if(matched){ - this.sortFieldOrder = new AggregateParams.SortFieldOrder(field, order.equals("asc")); - } - } - - public boolean find(){ - return this.matched; - } - - public AggregateParams.SortFieldOrder sortFieldOrder(){ - return this.sortFieldOrder; - } + public SortFieldOrderTypeMatcher(boolean matched, String field, String order) { + this.matched = matched; + if (matched) { + this.sortFieldOrder = new AggregateParams.SortFieldOrder(field, order.equals("asc")); + } + } + + public boolean find() { + return this.matched; + } + + public AggregateParams.SortFieldOrder sortFieldOrder() { + return this.sortFieldOrder; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java index 83c683c..90abf2c 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/Function.java @@ -16,91 +16,112 @@ */ package org.apache.eagle.query.aggregate.raw; -public abstract class Function{ - private int count = 0; - protected void incrCount(int num){ count += num; } - public int count(){ return count; } - public abstract void run(double v,int count); - public void run(double v){ run(v,1); } - public abstract double result(); - - public static class Avg extends Function { - private double total; - public Avg(){ - this.total = 0.0; - } - @Override - public void run(double v,int count){ - this.incrCount(count); - total += v; - } - @Override - public double result(){ - return this.total/this.count(); - } - } - - public static class Max extends Function { - private double maximum; - public Max(){ - // TODO is this a bug, or only positive numeric calculation is supported - this.maximum = 0.0; - } - - @Override - public void run(double v,int count){ - this.incrCount(count); - if(v > maximum){ - maximum = v; - } - } - - @Override - public double result(){ - return maximum; - } - } - - public static class Min extends Function { - private double minimum; - public Min(){ - // TODO is this a bug, or only positive numeric calculation is supported - this.minimum = Double.MAX_VALUE; - } - @Override - public void run(double v,int count){ - this.incrCount(count); - if(v < minimum){ - minimum = v; - } - } - - @Override - public double result(){ - return minimum; - } - } - - public static class Sum extends Function { - private double summary; - public Sum(){ - this.summary = 0.0; - } - @Override - public void run(double v,int count){ - this.incrCount(count); - this.summary += v; - } - - @Override - public double result(){ - return this.summary; - } - } - - public static class Count extends Sum{ - public Count(){ - super(); - } - } +public abstract class Function { + private int count = 0; + + protected void incrCount(int num) { + count += num; + } + + public int count() { + return count; + } + + public abstract void run(double v,int count); + + public void run(double v) { + run(v, 1); + } + + public abstract double result(); + + public static class Avg extends Function { + private double total; + + public Avg() { + this.total = 0.0; + } + + @Override + public void run(double v, int count) { + this.incrCount(count); + total += v; + } + + @Override + public double result() { + return this.total / this.count(); + } + } + + public static class Max extends Function { + private double maximum; + + public Max() { + // TODO is this a bug, or only positive numeric calculation is supported + this.maximum = 0.0; + } + + @Override + public void run(double v,int count) { + this.incrCount(count); + if (v > maximum) { + maximum = v; + } + } + + @Override + public double result() { + return maximum; + } + } + + public static class Min extends Function { + private double minimum; + + public Min() { + // TODO is this a bug, or only positive numeric calculation is supported + this.minimum = Double.MAX_VALUE; + } + + @Override + public void run(double v,int count) { + this.incrCount(count); + if (v < minimum) { + minimum = v; + } + } + + @Override + public double result() { + return minimum; + } + } + + public static class Sum extends Function { + private double summary; + + public Sum() { + this.summary = 0.0; + } + + @Override + public void run(double v,int count) { + this.incrCount(count); + this.summary += v; + } + + @Override + public double result() { + return this.summary; + } + } + + public static class Count extends Sum { + + public Count() { + super(); + } + + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java index c6d1861..4f9330d 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/FunctionFactory.java @@ -21,55 +21,55 @@ import org.apache.eagle.query.aggregate.AggregateFunctionType; import java.util.HashMap; import java.util.Map; -public abstract class FunctionFactory{ - public abstract Function createFunction(); +public abstract class FunctionFactory { + public abstract Function createFunction(); - public static class AvgFactory extends FunctionFactory { - @Override - public Function createFunction(){ - return new Function.Avg(); - } - } + public static class AvgFactory extends FunctionFactory { + @Override + public Function createFunction() { + return new Function.Avg(); + } + } - public static class MaxFactory extends FunctionFactory { - @Override - public Function createFunction(){ - return new Function.Max(); - } - } + public static class MaxFactory extends FunctionFactory { + @Override + public Function createFunction() { + return new Function.Max(); + } + } - public static class MinFactory extends FunctionFactory { - @Override - public Function createFunction(){ - return new Function.Min(); - } - } + public static class MinFactory extends FunctionFactory { + @Override + public Function createFunction() { + return new Function.Min(); + } + } - public static class CountFactory extends FunctionFactory { - @Override - public Function createFunction(){ - return new Function.Count(); - } - } + public static class CountFactory extends FunctionFactory { + @Override + public Function createFunction() { + return new Function.Count(); + } + } - public static class SumFactory extends FunctionFactory { - @Override - public Function createFunction(){ - return new Function.Sum(); - } - } + public static class SumFactory extends FunctionFactory { + @Override + public Function createFunction() { + return new Function.Sum(); + } + } - public static FunctionFactory locateFunctionFactory(AggregateFunctionType funcType){ - return _functionFactories.get(funcType.name()); - } + public static FunctionFactory locateFunctionFactory(AggregateFunctionType funcType) { + return functionFactories.get(funcType.name()); + } - private static Map<String, FunctionFactory> _functionFactories = new HashMap<String, FunctionFactory>(); - static{ - _functionFactories.put(AggregateFunctionType.count.name(), new CountFactory()); - _functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory()); - _functionFactories.put(AggregateFunctionType.min.name(), new MinFactory()); - _functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory()); - _functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory()); - } + private static Map<String, FunctionFactory> functionFactories = new HashMap<String, FunctionFactory>(); + + static { + functionFactories.put(AggregateFunctionType.count.name(), new CountFactory()); + functionFactories.put(AggregateFunctionType.sum.name(), new SumFactory()); + functionFactories.put(AggregateFunctionType.min.name(), new MinFactory()); + functionFactories.put(AggregateFunctionType.max.name(), new MaxFactory()); + functionFactories.put(AggregateFunctionType.avg.name(), new AvgFactory()); + } } - \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java index c8ed260..b6970d0 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKey.java @@ -38,112 +38,115 @@ import java.util.ListIterator; * </pre> */ public class GroupbyKey implements Writable { - private final WritableList<BytesWritable> value; + private final WritableList<BytesWritable> value; - public void addValue(byte[] value){ - this.value.add(new BytesWritable(value)); - } - public void addAll(List<BytesWritable> list){ - this.value.addAll(list); - } + public void addValue(byte[] value) { + this.value.add(new BytesWritable(value)); + } - public List<BytesWritable> getValue(){ - return value; - } + public void addAll(List<BytesWritable> list) { + this.value.addAll(list); + } - /** - * empty constructor - */ - public GroupbyKey(){ - this.value = new WritableList<BytesWritable>(BytesWritable.class); - } + public List<BytesWritable> getValue() { + return value; + } - /** - * clear for reuse - */ - public void clear(){ - value.clear(); - } + /** + * empty constructor + */ + public GroupbyKey() { + this.value = new WritableList<BytesWritable>(BytesWritable.class); + } - /** - * copy constructor - * @param key - */ - public GroupbyKey(GroupbyKey key){ - this(); - ListIterator<BytesWritable> it = key.value.listIterator(); -// ListIterator<byte[]> it = key.value.listIterator(); - while(it.hasNext()){ - this.value.add(it.next()); - } - } + /** + * clear for reuse + */ + public void clear() { + value.clear(); + } - public GroupbyKey(List<byte[]> bytes){ - this(); - for(byte[] bt:bytes){ - this.addValue(bt); - } - } + /** + * copy constructor + * @param key + */ + public GroupbyKey(GroupbyKey key) { + this(); + ListIterator<BytesWritable> it = key.value.listIterator(); + // ListIterator<byte[]> it = key.value.listIterator(); + while (it.hasNext()) { + this.value.add(it.next()); + } + } - @Override - public boolean equals(Object obj){ - if(obj == this) - return true; - if(!(obj instanceof GroupbyKey)){ - return false; - } - GroupbyKey that = (GroupbyKey)obj; - ListIterator<BytesWritable> e1 = this.value.listIterator(); - ListIterator<BytesWritable> e2 = that.value.listIterator(); - while(e1.hasNext() && e2.hasNext()){ - if(!Arrays.equals(e1.next().getBytes(), e2.next().getBytes())) - return false; - } - return !(e1.hasNext() || e2.hasNext()); - } + public GroupbyKey(List<byte[]> bytes) { + this(); + for (byte[] bt:bytes) { + this.addValue(bt); + } + } - @Override - public String toString() { - List<String> items = new ArrayList<>(this.value.size()); - ListIterator<BytesWritable> iterator = this.value.listIterator(); - while(iterator.hasNext()){ - items.add(iterator.next().toString()); - } - return String.format("%s(%s)",this.getClass().getSimpleName(),StringUtils.join(items,",")); - } + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof GroupbyKey)) { + return false; + } + GroupbyKey that = (GroupbyKey)obj; + ListIterator<BytesWritable> e1 = this.value.listIterator(); + ListIterator<BytesWritable> e2 = that.value.listIterator(); + while (e1.hasNext() && e2.hasNext()) { + if (!Arrays.equals(e1.next().getBytes(), e2.next().getBytes())) { + return false; + } + } + return !(e1.hasNext() || e2.hasNext()); + } - @Override - public int hashCode(){ - ListIterator<BytesWritable> e1 = this.value.listIterator(); - int hash = 0xFFFFFFFF; - while(e1.hasNext()){ - hash ^= Arrays.hashCode(e1.next().getBytes()); - } - return hash; - } + @Override + public String toString() { + List<String> items = new ArrayList<>(this.value.size()); + ListIterator<BytesWritable> iterator = this.value.listIterator(); + while (iterator.hasNext()) { + items.add(iterator.next().toString()); + } + return String.format("%s(%s)",this.getClass().getSimpleName(),StringUtils.join(items,",")); + } - /** - * Serialize the fields of this object to <code>out</code>. - * - * @param out <code>DataOuput</code> to serialize this object into. - * @throws java.io.IOException - */ - @Override - public void write(DataOutput out) throws IOException { - this.value.write(out); - } + @Override + public int hashCode() { + ListIterator<BytesWritable> e1 = this.value.listIterator(); + int hash = 0xFFFFFFFF; + while (e1.hasNext()) { + hash ^= Arrays.hashCode(e1.next().getBytes()); + } + return hash; + } - /** - * Deserialize the fields of this object from <code>in</code>. - * <p/> - * <p>For efficiency, implementations should attempt to re-use storage in the - * existing object where possible.</p> - * - * @param in <code>DataInput</code> to deseriablize this object from. - * @throws java.io.IOException - */ - @Override - public void readFields(DataInput in) throws IOException { - this.value.readFields(in); - } + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws java.io.IOException + */ + @Override + public void write(DataOutput out) throws IOException { + this.value.write(out); + } + + /** + * Deserialize the fields of this object from <code>in</code>. + * <p/> + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @throws java.io.IOException + */ + @Override + public void readFields(DataInput in) throws IOException { + this.value.readFields(in); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java index 7e20029..723df2c 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyAggregatable.java @@ -19,7 +19,7 @@ package org.apache.eagle.query.aggregate.raw; import java.util.List; /** - * The generic interface to unify the GroupbyKeyValue-based results of different + * The generic interface to unify the GroupbyKeyValue-based results of different * business logic aggregates like RawAggregator or TimeSeriesAggregator * * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator @@ -29,11 +29,11 @@ import java.util.List; * */ public interface GroupbyKeyAggregatable { - /** - * @see RawAggregator#getGroupbyKeyValues() - * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator#getGroupbyKeyValues() - * - * @return - */ - public List<GroupbyKeyValue> getGroupbyKeyValues(); + /** + * @see RawAggregator#getGroupbyKeyValues() + * @see org.apache.eagle.query.aggregate.timeseries.TimeSeriesAggregator#getGroupbyKeyValues() + * + * @return + */ + List<GroupbyKeyValue> getGroupbyKeyValues(); } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java index f976c8c..ad1f755 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyComparator.java @@ -23,21 +23,25 @@ import java.util.Comparator; import java.util.List; import java.util.ListIterator; -public class GroupbyKeyComparator implements Comparator<GroupbyKey>{ - @Override - public int compare(GroupbyKey key1, GroupbyKey key2){ - List<BytesWritable> list1 = key1.getValue(); - List<BytesWritable> list2 = key2.getValue(); - - if(list1 == null || list2 == null || list1.size() != list2.size()) - throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size"); - ListIterator<BytesWritable> e1 = list1.listIterator(); - ListIterator<BytesWritable> e2 = list2.listIterator(); - while(e1.hasNext() && e2.hasNext()){ - int r = Bytes.compareTo(e1.next().copyBytes(), e2.next().copyBytes()); - if(r != 0) - return r; - } - return 0; - } +public class GroupbyKeyComparator implements Comparator<GroupbyKey> { + + @Override + public int compare(GroupbyKey key1, GroupbyKey key2) { + List<BytesWritable> list1 = key1.getValue(); + List<BytesWritable> list2 = key2.getValue(); + + if (list1 == null || list2 == null || list1.size() != list2.size()) { + throw new IllegalArgumentException("2 list of groupby fields must be non-null and have the same size"); + } + ListIterator<BytesWritable> e1 = list1.listIterator(); + ListIterator<BytesWritable> e2 = list2.listIterator(); + while (e1.hasNext() && e2.hasNext()) { + int r = Bytes.compareTo(e1.next().copyBytes(), e2.next().copyBytes()); + if (r != 0) { + return r; + } + } + return 0; + } + } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java index 2256761..8420b11 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValue.java @@ -36,56 +36,59 @@ import java.io.IOException; * @since : 11/4/14,2014 */ public class GroupbyKeyValue implements Writable { - private GroupbyKey key; - private GroupbyValue value; - public GroupbyKeyValue(){ - this.key = new GroupbyKey(); - this.value = new GroupbyValue(); - } - public GroupbyKeyValue(GroupbyKey key,GroupbyValue value){ - this.key = key; - this.value = value; - } - public GroupbyKey getKey() { - return key; - } + private GroupbyKey key; + private GroupbyValue value; - public void setKey(GroupbyKey key) { - this.key = key; - } + public GroupbyKeyValue() { + this.key = new GroupbyKey(); + this.value = new GroupbyValue(); + } - public GroupbyValue getValue() { - return value; - } + public GroupbyKeyValue(GroupbyKey key,GroupbyValue value) { + this.key = key; + this.value = value; + } - public void setValue(GroupbyValue value) { - this.value = value; - } + public GroupbyKey getKey() { + return key; + } - /** - * Serialize the fields of this object to <code>out</code>. - * - * @param out <code>DataOuput</code> to serialize this object into. - * @throws java.io.IOException - */ - @Override - public void write(DataOutput out) throws IOException { - this.key.write(out); - this.value.write(out); - } + public void setKey(GroupbyKey key) { + this.key = key; + } - /** - * Deserialize the fields of this object from <code>in</code>. - * <p/> - * <p>For efficiency, implementations should attempt to re-use storage in the - * existing object where possible.</p> - * - * @param in <code>DataInput</code> to deseriablize this object from. - * @throws java.io.IOException - */ - @Override - public void readFields(DataInput in) throws IOException { - this.key.readFields(in); - this.value.readFields(in); - } + public GroupbyValue getValue() { + return value; + } + + public void setValue(GroupbyValue value) { + this.value = value; + } + + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws java.io.IOException + */ + @Override + public void write(DataOutput out) throws IOException { + this.key.write(out); + this.value.write(out); + } + + /** + * Deserialize the fields of this object from <code>in</code>. + * <p/> + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @throws java.io.IOException + */ + @Override + public void readFields(DataInput in) throws IOException { + this.key.readFields(in); + this.value.readFields(in); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java index 6ca4bec..e443624 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyKeyValueCreationListener.java @@ -20,5 +20,5 @@ package org.apache.eagle.query.aggregate.raw; * @since : 11/11/14,2014 */ public interface GroupbyKeyValueCreationListener { - void keyValueCreated(GroupbyKeyValue kv); + void keyValueCreated(GroupbyKeyValue kv); } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java index b7f2c43..20679b9 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/GroupbyValue.java @@ -41,99 +41,105 @@ import java.io.IOException; * * @since : 11/4/14,2014 */ -public class GroupbyValue implements Writable{ - private final WritableList<DoubleWritable> value; - private WritableList<BytesWritable> meta; - private int initialCapacity=1; - public GroupbyValue(){ - this(1); - } - /** - * Constructs an empty list with the specified initial capacity. - * - * @param initialCapacity the initial capacity of the list - * @exception IllegalArgumentException if the specified initial capacity - * is negative - */ - public GroupbyValue(int initialCapacity ){ - this.initialCapacity = initialCapacity; - this.value = new WritableList<DoubleWritable>(DoubleWritable.class,this.initialCapacity); - this.meta = new WritableList<BytesWritable>(BytesWritable.class,this.initialCapacity); - } - - public WritableList<DoubleWritable> getValue(){ - return this.value; - } - - public WritableList<BytesWritable> getMeta(){ - return this.meta; - } - - public DoubleWritable get(int index){ - return this.value.get(index); - } - - public BytesWritable getMeta(int index){ - if(this.meta==null) return null; - return this.meta.get(index); - } - - // Values - public void add(DoubleWritable value){ - this.value.add(value); - } - public void add(Double value){ - this.value.add(new DoubleWritable(value)); - } - - public void set(int index,DoubleWritable value){ - this.value.set(index, value); - } - - ////////////// - // Meta - ///////////// - public void addMeta(BytesWritable meta){ - this.meta.add(meta); - } - - public void addMeta(int meta){ - this.meta.add(new BytesWritable(ByteUtil.intToBytes(meta))); - } - - public void setMeta(int index,BytesWritable meta){ - this.meta.set(index,meta); - } - public void setMeta(int index,int meta){ - this.meta.set(index, new BytesWritable(ByteUtil.intToBytes(meta))); - } - - /** - * Serialize the fields of this object to <code>out</code>. - * - * @param out <code>DataOuput</code> to serialize this object into. - * @throws java.io.IOException - */ - @Override - public void write(DataOutput out) throws IOException { - out.writeInt(this.initialCapacity); - this.value.write(out); - this.meta.write(out); - } - - /** - * Deserialize the fields of this object from <code>in</code>. - * <p/> - * <p>For efficiency, implementations should attempt to re-use storage in the - * existing object where possible.</p> - * - * @param in <code>DataInput</code> to deseriablize this object from. - * @throws java.io.IOException - */ - @Override - public void readFields(DataInput in) throws IOException { - this.initialCapacity = in.readInt(); - this.value.readFields(in); - this.meta.readFields(in); - } +public class GroupbyValue implements Writable { + private final WritableList<DoubleWritable> value; + private WritableList<BytesWritable> meta; + private int initialCapacity = 1; + + public GroupbyValue() { + this(1); + } + + /** + * Constructs an empty list with the specified initial capacity. + * + * @param initialCapacity the initial capacity of the list + * @exception IllegalArgumentException if the specified initial capacity + * is negative + */ + public GroupbyValue(int initialCapacity) { + this.initialCapacity = initialCapacity; + this.value = new WritableList<DoubleWritable>(DoubleWritable.class,this.initialCapacity); + this.meta = new WritableList<BytesWritable>(BytesWritable.class,this.initialCapacity); + } + + public WritableList<DoubleWritable> getValue() { + return this.value; + } + + public DoubleWritable get(int index) { + return this.value.get(index); + } + + public WritableList<BytesWritable> getMeta() { + return this.meta; + } + + public BytesWritable getMeta(int index) { + if (this.meta == null) { + return null; + } + return this.meta.get(index); + } + + // Values + public void add(DoubleWritable value) { + this.value.add(value); + } + + public void add(Double value) { + this.value.add(new DoubleWritable(value)); + } + + public void set(int index,DoubleWritable value) { + this.value.set(index, value); + } + + ////////////// + // Meta + ///////////// + public void addMeta(BytesWritable meta) { + this.meta.add(meta); + } + + public void addMeta(int meta) { + this.meta.add(new BytesWritable(ByteUtil.intToBytes(meta))); + } + + public void setMeta(int index,BytesWritable meta) { + this.meta.set(index,meta); + } + + public void setMeta(int index,int meta) { + this.meta.set(index, new BytesWritable(ByteUtil.intToBytes(meta))); + } + + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws java.io.IOException + */ + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.initialCapacity); + this.value.write(out); + this.meta.write(out); + } + + /** + * Deserialize the fields of this object from <code>in</code>. + * <p/> + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @throws java.io.IOException + */ + @Override + public void readFields(DataInput in) throws IOException { + this.initialCapacity = in.readInt(); + this.value.readFields(in); + this.meta.readFields(in); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java index 0468074..5883b20 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawAggregator.java @@ -25,47 +25,47 @@ import java.util.ListIterator; import java.util.Map; public class RawAggregator implements QualifierCreationListener,GroupbyKeyAggregatable { - private List<String> groupbyFields; - private GroupbyKey key; - private static final byte[] UNASSIGNED = "unassigned".getBytes(); - private RawGroupbyBucket bucket; + private List<String> groupbyFields; + private GroupbyKey key; + private static final byte[] UNASSIGNED = "unassigned".getBytes(); + private RawGroupbyBucket bucket; - public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed){ - this.groupbyFields = groupbyFields; - key = new GroupbyKey(); - bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed); - } + public RawAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFunctionTypes, List<String> aggregatedFields, EntityDefinition ed) { + this.groupbyFields = groupbyFields; + key = new GroupbyKey(); + bucket = new RawGroupbyBucket(aggregateFunctionTypes, aggregatedFields, ed); + } - @Override - public void qualifierCreated(Map<String, byte[]> qualifiers){ - key.clear(); - ListIterator<String> it = groupbyFields.listIterator(); - while(it.hasNext()){ - byte[] groupbyFieldValue = qualifiers.get(it.next()); - if(groupbyFieldValue == null){ - key.addValue(UNASSIGNED); - }else{ - key.addValue(groupbyFieldValue); - } - } - GroupbyKey newKey = null; - if(bucket.exists(key)){ - newKey = key; - }else{ - newKey = new GroupbyKey(key); - } - - bucket.addDatapoint(newKey, qualifiers); - } + @Override + public void qualifierCreated(Map<String, byte[]> qualifiers) { + key.clear(); + ListIterator<String> it = groupbyFields.listIterator(); + while (it.hasNext()) { + byte[] groupbyFieldValue = qualifiers.get(it.next()); + if (groupbyFieldValue == null) { + key.addValue(UNASSIGNED); + } else { + key.addValue(groupbyFieldValue); + } + } + GroupbyKey newKey = null; + if (bucket.exists(key)) { + newKey = key; + } else { + newKey = new GroupbyKey(key); + } - /** - * @return - */ - public Map<List<String>, List<Double>> result(){ - return bucket.result(); - } + bucket.addDatapoint(newKey, qualifiers); + } - public List<GroupbyKeyValue> getGroupbyKeyValues(){ - return bucket.groupbyKeyValues(); - } + /** + * @return + */ + public Map<List<String>, List<Double>> result() { + return bucket.result(); + } + + public List<GroupbyKeyValue> getGroupbyKeyValues() { + return bucket.groupbyKeyValues(); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java index 47b84a0..b0aa79c 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/RawGroupbyBucket.java @@ -30,167 +30,174 @@ import org.slf4j.LoggerFactory; import java.util.*; public class RawGroupbyBucket { - private final static Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class); + private static final Logger LOG = LoggerFactory.getLogger(RawGroupbyBucket.class); - private List<String> aggregatedFields; - private EntityDefinition entityDefinition; + private List<String> aggregatedFields; + private EntityDefinition entityDefinition; - - private List<AggregateFunctionType> types; - private SortedMap<GroupbyKey, List<Function>> group2FunctionMap = - new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator()); - public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed){ - this.types = types; - this.aggregatedFields = aggregatedFields; - this.entityDefinition = ed; - } + private List<AggregateFunctionType> types; + private SortedMap<GroupbyKey, List<Function>> group2FunctionMap = + new TreeMap<GroupbyKey, List<Function>>(new GroupbyKeyComparator()); - public boolean exists(GroupbyKey key){ - return group2FunctionMap.containsKey(key); - } + public RawGroupbyBucket(List<AggregateFunctionType> types, List<String> aggregatedFields, EntityDefinition ed) { + this.types = types; + this.aggregatedFields = aggregatedFields; + this.entityDefinition = ed; + } - public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values){ - // locate groupby bucket - List<Function> functions = group2FunctionMap.get(groupbyKey); - if(functions == null){ - functions = new ArrayList<Function>(); - for(AggregateFunctionType type : types){ - FunctionFactory ff = FunctionFactory.locateFunctionFactory(type); - if(ff == null){ - LOG.error("FunctionFactory of AggregationFunctionType:"+type+" is null"); - }else{ - functions.add(ff.createFunction()); - } - } - group2FunctionMap.put(groupbyKey, functions); - } - ListIterator<Function> e1 = functions.listIterator(); - ListIterator<String> e2 = aggregatedFields.listIterator(); - while(e1.hasNext() && e2.hasNext()){ - Function f = e1.next(); - String aggregatedField = e2.next(); - byte[] v = values.get(aggregatedField); - if(f instanceof Function.Count){ // handle count - if(entityDefinition.getMetricDefinition()==null) { - f.run(1.0); - continue; - }else if(v == null){ - aggregatedField = GenericMetricEntity.VALUE_FIELD; - v = values.get(aggregatedField); - } - } - if(v != null){ - Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField); - EntitySerDeser<?> serDeser = q.getSerDeser(); - // double d = 0.0; - if(serDeser instanceof IntSerDeser){ - double d= (Integer)serDeser.deserialize(v); - f.run(d); - }else if(serDeser instanceof LongSerDeser){ - double d = (Long)serDeser.deserialize(v); - f.run(d); - }else if(serDeser instanceof DoubleSerDeser){ - double d = (Double)serDeser.deserialize(v); - f.run(d); - // TODO: support numeric array type that is not metric - }else if(serDeser instanceof DoubleArraySerDeser){ - double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v); - if(f instanceof Function.Count){ - f.run(d.length); - } else { - for(double i:d) f.run(i); - } - }else if(serDeser instanceof IntArraySerDeser){ - int[] d = ((IntArraySerDeser) serDeser).deserialize(v); - if(f instanceof Function.Count){ - f.run(d.length); - }else{ - for(int i:d) f.run(i); - } - }else{ - if(LOG.isDebugEnabled()) LOG.debug("EntitySerDeser of field "+aggregatedField+" is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0"); - } - }else if(TokenConstant.isExpression(aggregatedField)){ - String expression = TokenConstant.parseExpressionContent(aggregatedField); - try { - Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition); - if(entityDefinition.getMetricDefinition() == null) { - double value = ExpressionParser.eval(expression,doubleMap); - // LOG.info("DEBUG: Eval "+expression +" = "+value); - f.run(value); - }else{ - Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD); - EntitySerDeser _serDeser = qualifier.getSerDeser(); - byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD); - if( _serDeser instanceof DoubleArraySerDeser){ - double[] d = (double[]) _serDeser.deserialize(valueBytes); - if(f instanceof Function.Count) { - f.run(d.length); - }else{ - for(double i:d){ - doubleMap.put(GenericMetricEntity.VALUE_FIELD,i); - f.run(ExpressionParser.eval(expression, doubleMap)); - } - } - }else if(_serDeser instanceof IntArraySerDeser){ - int[] d = (int[]) _serDeser.deserialize(valueBytes); - if(f instanceof Function.Count) { - f.run(d.length); - }else { - for (double i : d) { - doubleMap.put(GenericMetricEntity.VALUE_FIELD, i); - f.run(ExpressionParser.eval(expression, doubleMap)); - } - } - }else{ - double value = ExpressionParser.eval(expression,doubleMap); - f.run(value); - } - } - } catch (Exception e) { - LOG.error("Got exception to evaluate expression: "+expression+", exception: "+e.getMessage(),e); - } - } - } - } + public boolean exists(GroupbyKey key) { + return group2FunctionMap.containsKey(key); + } - /** - * expensive operation - create objects and format the result - * @return - */ - public List<GroupbyKeyValue> groupbyKeyValues(){ - List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>(); - for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){ - GroupbyValue value = new GroupbyValue(); - for(Function f : entry.getValue()){ - value.add(new DoubleWritable(f.result())); - value.addMeta(f.count()); - } - results.add(new GroupbyKeyValue(entry.getKey(),value)); - } - return results; - } + public void addDatapoint(GroupbyKey groupbyKey, Map<String, byte[]> values) { + // locate groupby bucket + List<Function> functions = group2FunctionMap.get(groupbyKey); + if (functions == null) { + functions = new ArrayList<Function>(); + for (AggregateFunctionType type : types) { + FunctionFactory ff = FunctionFactory.locateFunctionFactory(type); + if (ff == null) { + LOG.error("FunctionFactory of AggregationFunctionType:" + type + " is null"); + } else { + functions.add(ff.createFunction()); + } + } + group2FunctionMap.put(groupbyKey, functions); + } + ListIterator<Function> e1 = functions.listIterator(); + ListIterator<String> e2 = aggregatedFields.listIterator(); + while (e1.hasNext() && e2.hasNext()) { + Function f = e1.next(); + String aggregatedField = e2.next(); + byte[] v = values.get(aggregatedField); + if (f instanceof Function.Count) { // handle count + if (entityDefinition.getMetricDefinition() == null) { + f.run(1.0); + continue; + } else if (v == null) { + aggregatedField = GenericMetricEntity.VALUE_FIELD; + v = values.get(aggregatedField); + } + } + if (v != null) { + Qualifier q = entityDefinition.getDisplayNameMap().get(aggregatedField); + EntitySerDeser<?> serDeser = q.getSerDeser(); + // double d = 0.0; + if (serDeser instanceof IntSerDeser) { + double d = (Integer)serDeser.deserialize(v); + f.run(d); + } else if (serDeser instanceof LongSerDeser) { + double d = (Long)serDeser.deserialize(v); + f.run(d); + } else if (serDeser instanceof DoubleSerDeser) { + double d = (Double)serDeser.deserialize(v); + f.run(d); + // TODO: support numeric array type that is not metric + } else if (serDeser instanceof DoubleArraySerDeser) { + double[] d = ((DoubleArraySerDeser) serDeser).deserialize(v); + if (f instanceof Function.Count) { + f.run(d.length); + } else { + for (double i:d) { + f.run(i); + } + } + } else if (serDeser instanceof IntArraySerDeser) { + int[] d = ((IntArraySerDeser) serDeser).deserialize(v); + if (f instanceof Function.Count) { + f.run(d.length); + } else { + for (int i:d) { + f.run(i); + } + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("EntitySerDeser of field " + aggregatedField + + " is not IntSerDeser or LongSerDeser or DoubleSerDeser or IntArraySerDeser or DoubleArraySerDeser, default as 0.0"); + } + } + } else if (TokenConstant.isExpression(aggregatedField)) { + String expression = TokenConstant.parseExpressionContent(aggregatedField); + try { + Map<String,Double> doubleMap = EntityQualifierUtils.bytesMapToDoubleMap(values, entityDefinition); + if (entityDefinition.getMetricDefinition() == null) { + double value = ExpressionParser.eval(expression,doubleMap); + // LOG.info("DEBUG: Eval "+expression +" = "+value); + f.run(value); + } else { + Qualifier qualifier = entityDefinition.getDisplayNameMap().get(GenericMetricEntity.VALUE_FIELD); + EntitySerDeser _serDeser = qualifier.getSerDeser(); + byte[] valueBytes = values.get(GenericMetricEntity.VALUE_FIELD); + if ( _serDeser instanceof DoubleArraySerDeser) { + double[] d = (double[]) _serDeser.deserialize(valueBytes); + if (f instanceof Function.Count) { + f.run(d.length); + } else { + for (double i:d) { + doubleMap.put(GenericMetricEntity.VALUE_FIELD,i); + f.run(ExpressionParser.eval(expression, doubleMap)); + } + } + } else if (_serDeser instanceof IntArraySerDeser) { + int[] d = (int[]) _serDeser.deserialize(valueBytes); + if (f instanceof Function.Count) { + f.run(d.length); + } else { + for (double i : d) { + doubleMap.put(GenericMetricEntity.VALUE_FIELD, i); + f.run(ExpressionParser.eval(expression, doubleMap)); + } + } + } else { + double value = ExpressionParser.eval(expression,doubleMap); + f.run(value); + } + } + } catch (Exception e) { + LOG.error("Got exception to evaluate expression: " + expression + ", exception: " + e.getMessage(), e); + } + } + } + } - /** - * expensive operation - create objects and format the result - * @return - */ - public Map<List<String>, List<Double>> result(){ - Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(); - for(Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()){ - List<Double> values = new ArrayList<Double>(); - for(Function f : entry.getValue()){ - values.add(f.result()); - } - GroupbyKey key = entry.getKey(); - List<BytesWritable> list1 = key.getValue(); - List<String> list2 = new ArrayList<String>(); - for(BytesWritable e : list1){ - list2.add(new String(e.copyBytes())); - } - result.put(list2, values); - } - return result; - } + /** + * expensive operation - create objects and format the result + * @return + */ + public List<GroupbyKeyValue> groupbyKeyValues() { + List<GroupbyKeyValue> results = new ArrayList<GroupbyKeyValue>(); + for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) { + GroupbyValue value = new GroupbyValue(); + for (Function f : entry.getValue()) { + value.add(new DoubleWritable(f.result())); + value.addMeta(f.count()); + } + results.add(new GroupbyKeyValue(entry.getKey(),value)); + } + return results; + } + + /** + * expensive operation - create objects and format the result + * @return + */ + public Map<List<String>, List<Double>> result() { + Map<List<String>, List<Double>> result = new HashMap<List<String>, List<Double>>(); + for (Map.Entry<GroupbyKey, List<Function>> entry : this.group2FunctionMap.entrySet()) { + List<Double> values = new ArrayList<Double>(); + for (Function f : entry.getValue()) { + values.add(f.result()); + } + GroupbyKey key = entry.getKey(); + List<BytesWritable> list1 = key.getValue(); + List<String> list2 = new ArrayList<String>(); + for (BytesWritable e : list1) { + list2.add(new String(e.copyBytes())); + } + result.put(list2, values); + } + return result; + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java index f9932a5..f3d6afd 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/raw/WritableList.java @@ -27,78 +27,75 @@ import java.util.ArrayList; /** * @since : 11/6/14,2014 */ -public class WritableList<E extends Writable> extends ArrayList<E> implements Writable{ - private Class<E> itemTypeClass; +public class WritableList<E extends Writable> extends ArrayList<E> implements Writable { + private Class<E> itemTypeClass; - public WritableList(Class<E> typeClass){ - this.itemTypeClass = typeClass; - } + public WritableList(Class<E> typeClass) { + this.itemTypeClass = typeClass; + } - public WritableList(Class<E> typeClass,int initialCapacity){ - super(initialCapacity); - this.itemTypeClass = typeClass; - } + public WritableList(Class<E> typeClass,int initialCapacity) { + super(initialCapacity); + this.itemTypeClass = typeClass; + } - /** - * <h3> Get item class by </h3> - * <pre> - * (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0]; - * </pre> - */ - @Deprecated - public WritableList(){ - this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0]; - } + /** + * <h3> Get item class by </h3> + */ + @Deprecated + public WritableList() { + this.itemTypeClass = (Class<E>) ((ParameterizedType)getClass().getGenericSuperclass()).getActualTypeArguments()[0]; + } - private void check() throws IOException{ - if(this.itemTypeClass == null){ - throw new IOException("Class Type of WritableArrayList<E extends Writable> is null"); - } - } + private void check() throws IOException { + if (this.itemTypeClass == null) { + throw new IOException("Class Type of WritableArrayList<E extends Writable> is null"); + } + } - public Class<E> getItemClass(){ - return itemTypeClass; - } + public Class<E> getItemClass() { + return itemTypeClass; + } - /** - * Serialize the fields of this object to <code>out</code>. - * - * @param out <code>DataOuput</code> to serialize this object into. - * @throws java.io.IOException - */ - @Override - public void write(DataOutput out) throws IOException { - this.check(); - out.writeInt(this.size()); - for(Writable item: this){ - item.write(out); - } - } + /** + * Serialize the fields of this object to <code>out</code>. + * + * @param out <code>DataOuput</code> to serialize this object into. + * @throws java.io.IOException + */ + @Override + public void write(DataOutput out) throws IOException { + this.check(); + out.writeInt(this.size()); + for (Writable item: this) { + item.write(out); + } + } - /** - * Deserialize the fields of this object from <code>in</code>. - * <p/> - * <p>For efficiency, implementations should attempt to re-use storage in the - * existing object where possible.</p> - * - * @param in <code>DataInput</code> to deseriablize this object from. - * @throws java.io.IOException - */ - @Override - public void readFields(DataInput in) throws IOException { - this.check(); - int size = in.readInt(); - for(int i=0;i<size;i++){ - try { - E item = itemTypeClass.newInstance(); - item.readFields(in); - this.add(item); - } catch (InstantiationException e) { - throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e); - } catch (IllegalAccessException e) { - throw new IOException("Got exception to create instance for class: "+itemTypeClass+": "+e.getMessage(),e); - } - } - } + /** + * Deserialize the fields of this object from <code>in</code>. + * <p/> + * <p>For efficiency, implementations should attempt to re-use storage in the + * existing object where possible.</p> + * + * @param in <code>DataInput</code> to deseriablize this object from. + * @throws java.io.IOException + */ + @Override + public void readFields(DataInput in) throws IOException { + this.check(); + int size = in.readInt(); + for (int i = 0; i < size; i++) { + try { + E item = itemTypeClass.newInstance(); + item.readFields(in); + this.add(item); + } catch (InstantiationException e) { + throw new IOException("Got exception to create instance for class: " + itemTypeClass + ": " + e.getMessage(), e); + } catch (IllegalAccessException e) { + throw new IOException("Got exception to create instance for class: " + itemTypeClass + ": " + e.getMessage(), e); + } + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java index deb0838..a4c6d98 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/AbstractAggregator.java @@ -32,161 +32,163 @@ import java.lang.reflect.Method; import java.util.ArrayList; import java.util.List; -public abstract class AbstractAggregator implements Aggregator, EntityCreationListener{ - private final static Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class); +public abstract class AbstractAggregator implements Aggregator, EntityCreationListener { + private static final Logger LOG = LoggerFactory.getLogger(AbstractAggregator.class); - private static final String UNASSIGNED = "unassigned"; - protected List<String> groupbyFields; - protected List<AggregateFunctionType> aggregateFunctionTypes; - protected List<String> aggregatedFields; - // a cache to know immediately if groupby field should come from tags(true) or qualifiers(false) - private Boolean[] _groupbyFieldPlacementCache; - private Method[] _aggregateFieldReflectedMethodCache; + private static final String UNASSIGNED = "unassigned"; + protected List<String> groupbyFields; + protected List<AggregateFunctionType> aggregateFunctionTypes; + protected List<String> aggregatedFields; + // a cache to know immediately if groupby field should come from tags(true) or qualifiers(false) + private Boolean[] groupbyFieldPlacementCache; + private Method[] aggregateFieldReflectedMethodCache; - public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields){ - this.groupbyFields = groupbyFields; - this.aggregateFunctionTypes = aggregateFuntionTypes; - this.aggregatedFields = aggregatedFields; - _aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()]; - _groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()]; - } - - @Override - public void entityCreated(TaggedLogAPIEntity entity) throws Exception{ - accumulate(entity); - } - - public abstract Object result(); - - protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i){ - String groupbyFieldValue = entity.getTags().get(groupbyField); - if(groupbyFieldValue != null){ - _groupbyFieldPlacementCache[i] = true; - return groupbyFieldValue; - } - return null; - } - - protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i){ - try{ - PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField); - if(pd == null) - return null; -// _groupbyFieldPlacementCache.put(groupbyField, false); - _groupbyFieldPlacementCache[i] = false; - return (String)(pd.getReadMethod().invoke(entity)); - }catch(NoSuchMethodException ex){ - return null; - }catch(InvocationTargetException ex){ - return null; - }catch(IllegalAccessException ex){ - return null; - } - } - - protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i){ - Boolean placement = _groupbyFieldPlacementCache[i]; - String groupbyFieldValue = null; - if(placement != null){ - groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i); - }else{ - groupbyFieldValue = createGroupFromTags(entity, groupbyField, i); - if(groupbyFieldValue == null){ - groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i); - } - } - groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue); - return groupbyFieldValue; - } - - /** - * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or - * number of non-null field - * For other aggregation, like sum,min,max,avg, we should resort to qualifiers - * @param entity - * @return - */ - protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception{ - List<Double> values = new ArrayList<Double>(); - int functionIndex = 0; - for(AggregateFunctionType type : aggregateFunctionTypes){ - if(type.name().equals(AggregateFunctionType.count.name())){ - values.add(new Double(1)); - }else{ - // find value in qualifier by checking java bean - String aggregatedField = aggregatedFields.get(functionIndex); - if(TokenConstant.isExpression(aggregatedField)){ - try { - String expr = TokenConstant.parseExpressionContent(aggregatedField); - values.add(ExpressionParser.eval(expr, entity)); - }catch (Exception ex){ - LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex); - throw ex; - } - }else { - try { - Method m = _aggregateFieldReflectedMethodCache[functionIndex]; - if (m == null) { -// pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField); -// if (pd == null) { -// final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName(); -// logger.error(errMsg); -// throw new Exception(errMsg); -// } -// Object obj = pd.getReadMethod().invoke(entity); - String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1); - m = entity.getClass().getMethod("get" + tmp); - _aggregateFieldReflectedMethodCache[functionIndex] = m; - } - Object obj = m.invoke(entity); - values.add(numberToDouble(obj)); - } catch (Exception ex) { - LOG.error("Cannot do aggregation for field " + aggregatedField, ex); - throw ex; - } - } - } - functionIndex++; - } - return values; - } - - /** - * TODO this is a hack, we need elegant way to convert type to a broad precision + public AbstractAggregator(List<String> groupbyFields, List<AggregateFunctionType> aggregateFuntionTypes, List<String> aggregatedFields) { + this.groupbyFields = groupbyFields; + this.aggregateFunctionTypes = aggregateFuntionTypes; + this.aggregatedFields = aggregatedFields; + aggregateFieldReflectedMethodCache = new Method[this.aggregatedFields.size()]; + groupbyFieldPlacementCache = new Boolean[this.groupbyFields.size()]; + } + + @Override + public void entityCreated(TaggedLogAPIEntity entity) throws Exception { + accumulate(entity); + } + + public abstract Object result(); + + protected String createGroupFromTags(TaggedLogAPIEntity entity, String groupbyField, int i) { + String groupbyFieldValue = entity.getTags().get(groupbyField); + if (groupbyFieldValue != null) { + groupbyFieldPlacementCache[i] = true; + return groupbyFieldValue; + } + return null; + } + + protected String createGroupFromQualifiers(TaggedLogAPIEntity entity, String groupbyField, int i) { + try { + PropertyDescriptor pd = PropertyUtils.getPropertyDescriptor(entity, groupbyField); + if (pd == null) { + return null; + } + // groupbyFieldPlacementCache.put(groupbyField, false); + groupbyFieldPlacementCache[i] = false; + return (String)(pd.getReadMethod().invoke(entity)); + } catch (NoSuchMethodException ex) { + return null; + } catch (InvocationTargetException ex) { + return null; + } catch (IllegalAccessException ex) { + return null; + } + } + + protected String determineGroupbyFieldValue(TaggedLogAPIEntity entity, String groupbyField, int i) { + Boolean placement = groupbyFieldPlacementCache[i]; + String groupbyFieldValue = null; + if (placement != null) { + groupbyFieldValue = placement.booleanValue() ? createGroupFromTags(entity, groupbyField, i) : createGroupFromQualifiers(entity, groupbyField, i); + } else { + groupbyFieldValue = createGroupFromTags(entity, groupbyField, i); + if (groupbyFieldValue == null) { + groupbyFieldValue = createGroupFromQualifiers(entity, groupbyField, i); + } + } + groupbyFieldValue = (groupbyFieldValue == null ? UNASSIGNED : groupbyFieldValue); + return groupbyFieldValue; + } + + /** + * TODO For count aggregation, special treatment is the value is always 0 unless we support count(*) or count(<fieldname>) which counts number of rows or + * number of non-null field + * For other aggregation, like sum,min,max,avg, we should resort to qualifiers + * @param entity + * @return + */ + protected List<Double> createPreAggregatedValues(TaggedLogAPIEntity entity) throws Exception { + List<Double> values = new ArrayList<Double>(); + int functionIndex = 0; + for (AggregateFunctionType type : aggregateFunctionTypes) { + if (type.name().equals(AggregateFunctionType.count.name())) { + values.add(new Double(1)); + } else { + // find value in qualifier by checking java bean + String aggregatedField = aggregatedFields.get(functionIndex); + if (TokenConstant.isExpression(aggregatedField)) { + try { + String expr = TokenConstant.parseExpressionContent(aggregatedField); + values.add(ExpressionParser.eval(expr, entity)); + } catch (Exception ex) { + LOG.error("Failed to evaluate expression-based aggregation: " + aggregatedField, ex); + throw ex; + } + } else { + try { + Method m = aggregateFieldReflectedMethodCache[functionIndex]; + if (m == null) { + // pd = PropertyUtils.getPropertyDescriptor(entity, aggregatedField); + // if (pd == null) { + // final String errMsg = "Field/tag " + aggregatedField + " is not defined for entity " + entity.getClass().getSimpleName(); + // logger.error(errMsg); + // throw new Exception(errMsg); + // } + // Object obj = pd.getReadMethod().invoke(entity); + String tmp = aggregatedField.substring(0, 1).toUpperCase() + aggregatedField.substring(1); + m = entity.getClass().getMethod("get" + tmp); + aggregateFieldReflectedMethodCache[functionIndex] = m; + } + Object obj = m.invoke(entity); + values.add(numberToDouble(obj)); + } catch (Exception ex) { + LOG.error("Cannot do aggregation for field " + aggregatedField, ex); + throw ex; + } + } + } + functionIndex++; + } + return values; + } + + /** + * TODO this is a hack, we need elegant way to convert type to a broad precision * - * @param obj - * @return - */ - protected Double numberToDouble(Object obj){ - if(obj instanceof Double) - return (Double)obj; - if(obj instanceof Integer){ - return new Double(((Integer)obj).doubleValue()); - } - if(obj instanceof Long){ - return new Double(((Long)obj).doubleValue()); - } - // TODO hack to support string field for demo purpose, should be removed - if(obj == null){ - return new Double(0.0); - } - if(obj instanceof String){ - try{ - return new Double((String)obj); - }catch(Exception ex){ - LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex); - return new Double(0.0); - } - } - if(obj instanceof double[]){ - double[] value = (double[]) obj; - if(value.length > 0){ - return new Double(value[0]); - }else{ - return new Double(0.0); - } - } - - throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double"); - } + * @param obj + * @return + */ + protected Double numberToDouble(Object obj) { + if (obj instanceof Double) { + return (Double)obj; + } + if (obj instanceof Integer) { + return new Double(((Integer)obj).doubleValue()); + } + if (obj instanceof Long) { + return new Double(((Long)obj).doubleValue()); + } + // TODO hack to support string field for demo purpose, should be removed + if (obj == null) { + return new Double(0.0); + } + if (obj instanceof String) { + try { + return new Double((String)obj); + } catch (Exception ex) { + LOG.warn("Datapoint ignored because it can not be converted to correct number for " + obj, ex); + return new Double(0.0); + } + } + if (obj instanceof double[]) { + double[] value = (double[]) obj; + if (value.length > 0) { + return new Double(value[0]); + } else { + return new Double(0.0); + } + } + + throw new IllegalAggregateFieldTypeException(obj.getClass().toString() + " type is not support. The aggregated field must be numeric type, int, long or double"); + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java index 1e70e91..db62cfc 100755 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/Aggregator.java @@ -25,5 +25,5 @@ public interface Aggregator { * @param entity accumulated entity instance * @throws Exception */ - public void accumulate(TaggedLogAPIEntity entity) throws Exception; + void accumulate(TaggedLogAPIEntity entity) throws Exception; } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/eagle/blob/c970bb42/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java index 7e35bec..a7a69e0 100644 --- a/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java +++ b/eagle-core/eagle-query/eagle-query-base/src/main/java/org/apache/eagle/query/aggregate/timeseries/EntityCreationListenerFactory.java @@ -19,7 +19,8 @@ package org.apache.eagle.query.aggregate.timeseries; import org.apache.eagle.log.entity.EntityCreationListener; public class EntityCreationListenerFactory { - public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener){ - return new SynchronizedEntityCreationListener(listener); - } + + public static EntityCreationListener synchronizedEntityCreationListener(EntityCreationListener listener) { + return new SynchronizedEntityCreationListener(listener); + } }
