http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
index 8209445..9e736ae 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java
@@ -34,552 +34,542 @@ import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
 /**
- * the steps of building hbase filters
- * 1. receive ORExpression from eagle-antlr
- * 2. iterate all ANDExpression in ORExpression
- *    2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option
- *    2.2 iterate all AtomicExpression in ANDExpression
- *       2.2.1 group AtomicExpression into 2 groups by looking up metadata, 
one is for tag filters, the other is for column filters
- *       2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL 
option
+ * the steps of building hbase filters 1. receive ORExpression from 
eagle-antlr 2. iterate all ANDExpression
+ * in ORExpression 2.1 put each ANDExpression to a new filter list with 
MUST_PASS_ONE option 2.2 iterate all
+ * AtomicExpression in ANDExpression 2.2.1 group AtomicExpression into 2 
groups by looking up metadata, one is
+ * for tag filters, the other is for column filters 2.2.2 put the above 2 
filters to a filter list with
+ * MUST_PASS_ALL option
  */
 public class HBaseFilterBuilder {
-       private static final Logger LOG = 
LoggerFactory.getLogger(HBaseFilterBuilder.class);
-       
-       /**
-        * syntax is @<fieldname>
-        */
-//     private static final String fnRegex = "^@(.*)$";
-       private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// 
Pattern.compile(fnRegex);
-       private static final Charset _defaultCharset = 
Charset.forName("ISO-8859-1");
-
-       private ORExpression _orExpr;
-       private EntityDefinition _ed;
-       private boolean _filterIfMissing;
-       private Charset _charset = _defaultCharset;
-
-       /**
-        * TODO: Verify performance impact
-        *
-        * @return
-        */
-       public Set<String> getFilterFields() {
-               return _filterFields;
-       }
-
-       /**
-        * Just add filter fields for expression filter
-        */
-       private Set<String> _filterFields;
-
-       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
-               this(ed, orExpr, false);
-       }
-       
-       public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, 
boolean filterIfMissing) {
-               this._ed = ed;
-               this._orExpr = orExpr;
-               this._filterIfMissing = filterIfMissing;
-       }
-       
-       public void setCharset(String charsetName){
-               _charset = Charset.forName(charsetName);
-       }
-       
-       public Charset getCharset(){
-               return _charset;
-       }
-       
-       /**
-        * Because we don't have metadata for tag, we regard non-qualifer field 
as tag. So one field possibly is not a real tag when this function return true. 
This happens
-        * when a user input an wrong field name which is neither tag or 
qualifier
-        *   
-        * @param field
-        */
-       private boolean isTag(String field){
-               return _ed.isTag(field);
-       }
-       
-       /**
-        * check whether this field is one entity attribute or not 
-        * @param fieldName
-        * @return
-        */
-       private String parseEntityAttribute(String fieldName){
-               Matcher m = _fnPattern.matcher(fieldName);
-               if(m.find()){
-                       return m.group(1);
-               }
-               return null;
-       }
-
-       /**
-        * Return the partition values for each or expression. The size of the 
returned list should be equal to
-        * the size of FilterList that {@link #buildFilters()} returns.
-        * 
-        * TODO: For now we don't support one query to query multiple 
partitions. In future if partition is defined, 
-        * for the entity, internally We need to spawn multiple queries and 
send one query for each partition.
-        * 
-        * @return Return the partition values for each or expression. Return 
null if the entity doesn't support
-        * partition
-        */
-       public List<String[]> getPartitionValues() {
-               final String[] partitions = _ed.getPartitions();
-               if (partitions == null || partitions.length == 0) {
-                       return null;
-               }
-               final List<String[]> result = new ArrayList<String[]>();
-               final Map<String, String> partitionKeyValueMap = new 
HashMap<String, String>();
-               for(ANDExpression andExpr : _orExpr.getANDExprList()) {
-                       partitionKeyValueMap.clear();
-                       for(AtomicExpression ae : andExpr.getAtomicExprList()) {
-                               // TODO temporarily ignore those fields which 
are not for attributes
-                               if(ae.getKeyType() == TokenType.ID) {
-                                       final String fieldName = 
parseEntityAttribute(ae.getKey());
-                                       if (fieldName == null) {
-                                               LOG.warn(fieldName + " field 
does not have format @<FieldName>, ignored");
-                                               continue;
-                                       }
-                                       if (_ed.isPartitionTag(fieldName) && 
ComparisonOperator.EQUAL.equals(ae.getOp())) {
-                                               final String value = 
ae.getValue();
-                                               
partitionKeyValueMap.put(fieldName, value);
-                                       }
-                               }
-                       }
-                       final String[] values = new String[partitions.length];
-                       result.add(values);
-                       for (int i = 0; i < partitions.length; ++i) {
-                               final String partition = partitions[i];
-                               final String value = 
partitionKeyValueMap.get(partition);
-                               values[i] = value;
-                       }
-               }
-               return result;
-       }
-
-       /**
-        * @see org.apache.eagle.query.parser.TokenType
-        *
-        * @return
-        */
-       public FilterList buildFilters(){
-               // TODO: Optimize to select between row filter or column filter 
for better performance
-               // Use row key filter priority by default
-               boolean rowFilterPriority = true;
-
-               FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
-               for(ANDExpression andExpr : _orExpr.getANDExprList()){
-                       
-                       FilterList list = new 
FilterList(Operator.MUST_PASS_ALL);
-                       Map<String, List<String>> tagFilters = new 
HashMap<String, List<String>>();
-                       List<QualifierFilterEntity> qualifierFilters = new 
ArrayList<QualifierFilterEntity>();
-//                     List<QualifierFilterEntry> tagLikeQualifierFilters = 
new ArrayList<QualifierFilterEntry>();
-
-                       // TODO refactor not to use too much if/else
-                       for(AtomicExpression ae : andExpr.getAtomicExprList()){
-                               // TODO temporarily ignore those fields which 
are not for attributes
-
-                               String fieldName = ae.getKey();
-                               if(ae.getKeyType() == TokenType.ID){
-                                       fieldName = 
parseEntityAttribute(fieldName);
-                                       if(fieldName == null){
-                                               LOG.warn(fieldName + " field 
does not have format @<FieldName>, ignored");
-                                               continue;
-                                       }
-                               }
-
-                               String value = ae.getValue();
-                               ComparisonOperator op = ae.getOp();
-                               TokenType keyType = ae.getKeyType();
-                               TokenType valueType = ae.getValueType();
-                               QualifierFilterEntity entry = new 
QualifierFilterEntity(fieldName,value,op,keyType,valueType);
-
-                               // TODO Exact match, need to add escape for 
those special characters here, including:
-                               // "-", "[", "]", "/", "{", "}", "(", ")", "*", 
"+", "?", ".", "\\", "^", "$", "|"
-
-                               if(keyType == TokenType.ID && isTag(fieldName)){
-                                       if 
((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op))
-                                                       && 
!TokenType.NULL.equals(valueType))
-                                       {
-                                               // Use RowFilter for equal TAG
-                                               if(tagFilters.get(fieldName) == 
null) tagFilters.put(fieldName, new ArrayList<String>());
-                                               
tagFilters.get(fieldName).add(value);
-                                       } else if (rowFilterPriority && 
ComparisonOperator.IN.equals(op))
-                                       {
-                                               // Use RowFilter here by default
-                                               if(tagFilters.get(fieldName) == 
null) tagFilters.put(fieldName, new ArrayList<String>());
-                                               
tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
-                                       } else if 
(ComparisonOperator.LIKE.equals(op)
-                                               || 
ComparisonOperator.NOT_LIKE.equals(op)
-                                               || 
ComparisonOperator.CONTAINS.equals(op)
-                                               || 
ComparisonOperator.NOT_CONTAINS.equals(op)
-                                               || 
ComparisonOperator.IN.equals(op)
-                                               || 
ComparisonOperator.IS.equals(op)
-                                               || 
ComparisonOperator.IS_NOT.equals(op)
-                                               || 
ComparisonOperator.NOT_EQUAL.equals(op)
-                                               || 
ComparisonOperator.EQUAL.equals(op)
-                                               || 
ComparisonOperator.NOT_IN.equals(op))
-                                       {
-                                               qualifierFilters.add(entry);
-                                       } else
-                                       {
-                                               LOG.warn("Don't support 
operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to 
ignore");
-                                               throw new 
IllegalArgumentException("Don't support operation: "+op+" on tag field: 
"+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not 
contains");
-                                       }
-                               }else{
-                                       qualifierFilters.add(entry);
-                               }
-                       }
-
-                       // Build RowFilter for equal tags
-                       list.addFilter(buildTagFilter(tagFilters));
-
-                       // Build SingleColumnValueFilter
-                       FilterList qualifierFilterList = 
buildQualifierFilter(qualifierFilters);
-                       if(qualifierFilterList != null && 
qualifierFilterList.getFilters().size()>0){
-                               list.addFilter(qualifierFilterList);
-                       }else {
-                               if(LOG.isDebugEnabled()) LOG.debug("Ignore 
empty qualifier filter from "+qualifierFilters.toString());
-                       }
-                       fltList.addFilter(list);
-               }
-               LOG.info("Query: " + _orExpr.toString() + " => Filter: " + 
fltList.toString());
-               return fltList;
-       }
-       
-       /**
-        * _charset is used to decode the byte array, in hbase server, 
RegexStringComparator uses the same
-        * charset to decode the byte array stored in qualifier
-        * for tag filter regex, it's always ISO-8859-1 as it only comes from 
String's hashcode (Integer)
-        * Note: regex comparasion is to compare String
-        */
-       protected Filter buildTagFilter(Map<String, List<String>> tagFilters){
-               RegexStringComparator regexStringComparator = new 
RegexStringComparator(buildTagFilterRegex(tagFilters));
-               regexStringComparator.setCharset(_charset);
-               RowFilter filter = new RowFilter(CompareOp.EQUAL, 
regexStringComparator);
-               return filter;
-       }
-       
-       /**
-        * all qualifiers' condition must be satisfied.
-        *
-        * <H1>Use RegexStringComparator for:</H1>
-        *      IN
-        *      LIKE
-        *      NOT_LIKE
-        *
-        * <H1>Use SubstringComparator for:</H1>
-        *      CONTAINS
-        *
-        * <H1>Use EntityQualifierHelper for:</H1>
-        *      EQUALS
-        *      NOT_EUQALS
-        *      LESS
-        *      LESS_OR_EQUAL
-        *      GREATER
-        *      GREATER_OR_EQUAL
-        *
-        * <H2>
-        *     TODO: Compare performance of RegexStringComparator 
,SubstringComparator ,EntityQualifierHelper
-        * </H2>
-        *
-        * @param qualifierFilters
-        * @return
-        */
-       protected FilterList buildQualifierFilter(List<QualifierFilterEntity> 
qualifierFilters){
-               FilterList list = new FilterList(Operator.MUST_PASS_ALL);
-               // iterate all the qualifiers
-               for(QualifierFilterEntity entry : qualifierFilters){
-                       // if contains expression based filter
-                       if(entry.getKeyType() == TokenType.EXP
-                                       || entry.getValueType() == TokenType.EXP
-                                       || entry.getKeyType() != TokenType.ID){
-                               
if(!EagleConfigFactory.load().isCoprocessorEnabled()) {
-                                       LOG.warn("Expression in filter may not 
support, because custom filter and coprocessor is disabled: " + 
entry.toString());
-                               }
-                               
list.addFilter(buildExpressionBasedFilter(entry));
-                               continue;
-                       }
-
-                       // else using SingleColumnValueFilter
-                       String qualifierName = entry.getKey();
-                       if(!isTag(entry.getKey())){
-                               Qualifier qualifier = 
_ed.getDisplayNameMap().get(entry.getKey());
-                               qualifierName = qualifier.getQualifierName();
-                       }
-
-                       // Comparator to be used for building HBase Filter
-                       // WritableByteArrayComparable comparator;
+    private static final Logger LOG = 
LoggerFactory.getLogger(HBaseFilterBuilder.class);
+
+    /*
+     * syntax is @<fieldname>
+     */
+    // private static final String fnRegex = "^@(.*)$";
+    private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// 
Pattern.compile(fnRegex);
+    private static final Charset _defaultCharset = 
Charset.forName("ISO-8859-1");
+
+    private ORExpression orExpr;
+    private EntityDefinition ed;
+    private boolean filterIfMissing;
+    private Charset charset = _defaultCharset;
+
+    /**
+     * TODO: Verify performance impact
+     *
+     * @return
+     */
+    public Set<String> getFilterFields() {
+        return filterFields;
+    }
+
+    /**
+     * Just add filter fields for expression filter
+     */
+    private Set<String> filterFields;
+
+    public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) {
+        this(ed, orExpr, false);
+    }
+
+    public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, 
boolean filterIfMissing) {
+        this.ed = ed;
+        this.orExpr = orExpr;
+        this.filterIfMissing = filterIfMissing;
+    }
+
+    public void setCharset(String charsetName) {
+        charset = Charset.forName(charsetName);
+    }
+
+    public Charset getCharset() {
+        return charset;
+    }
+
+    /**
+     * Because we don't have metadata for tag, we regard non-qualifer field as 
tag. So one field possibly is
+     * not a real tag when this function return true. This happens when a user 
input an wrong field name which
+     * is neither tag or qualifier
+     *
+     * @param field
+     */
+    private boolean isTag(String field) {
+        return ed.isTag(field);
+    }
+
+    /**
+     * check whether this field is one entity attribute or not
+     *
+     * @param fieldName
+     * @return
+     */
+    private String parseEntityAttribute(String fieldName) {
+        Matcher m = _fnPattern.matcher(fieldName);
+        if (m.find()) {
+            return m.group(1);
+        }
+        return null;
+    }
+
+    /**
+     * Return the partition values for each or expression. The size of the 
returned list should be equal to
+     * the size of FilterList that {@link #buildFilters()} returns. TODO: For 
now we don't support one query
+     * to query multiple partitions. In future if partition is defined, for 
the entity, internally We need to
+     * spawn multiple queries and send one query for each partition.
+     *
+     * @return Return the partition values for each or expression. Return null 
if the entity doesn't support
+     *         partition
+     */
+    public List<String[]> getPartitionValues() {
+        final String[] partitions = ed.getPartitions();
+        if (partitions == null || partitions.length == 0) {
+            return null;
+        }
+        final List<String[]> result = new ArrayList<String[]>();
+        final Map<String, String> partitionKeyValueMap = new HashMap<String, 
String>();
+        for (ANDExpression andExpr : orExpr.getANDExprList()) {
+            partitionKeyValueMap.clear();
+            for (AtomicExpression ae : andExpr.getAtomicExprList()) {
+                // TODO temporarily ignore those fields which are not for 
attributes
+                if (ae.getKeyType() == TokenType.ID) {
+                    final String fieldName = parseEntityAttribute(ae.getKey());
+                    if (fieldName == null) {
+                        LOG.warn(fieldName + " field does not have format 
@<FieldName>, ignored");
+                        continue;
+                    }
+                    if (ed.isPartitionTag(fieldName) && 
ComparisonOperator.EQUAL.equals(ae.getOp())) {
+                        final String value = ae.getValue();
+                        partitionKeyValueMap.put(fieldName, value);
+                    }
+                }
+            }
+            final String[] values = new String[partitions.length];
+            result.add(values);
+            for (int i = 0; i < partitions.length; ++i) {
+                final String partition = partitions[i];
+                final String value = partitionKeyValueMap.get(partition);
+                values[i] = value;
+            }
+        }
+        return result;
+    }
+
+    /**
+     * @see org.apache.eagle.query.parser.TokenType
+     * @return
+     */
+    public FilterList buildFilters() {
+        // TODO: Optimize to select between row filter or column filter for 
better performance
+        // Use row key filter priority by default
+        boolean rowFilterPriority = true;
+
+        FilterList fltList = new FilterList(Operator.MUST_PASS_ONE);
+        for (ANDExpression andExpr : orExpr.getANDExprList()) {
+
+            FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+            Map<String, List<String>> tagFilters = new HashMap<String, 
List<String>>();
+            List<QualifierFilterEntity> qualifierFilters = new 
ArrayList<QualifierFilterEntity>();
+            // List<QualifierFilterEntry> tagLikeQualifierFilters = new 
ArrayList<QualifierFilterEntry>();
+
+            // TODO refactor not to use too much if/else
+            for (AtomicExpression ae : andExpr.getAtomicExprList()) {
+                // TODO temporarily ignore those fields which are not for 
attributes
+
+                String fieldName = ae.getKey();
+                if (ae.getKeyType() == TokenType.ID) {
+                    fieldName = parseEntityAttribute(fieldName);
+                    if (fieldName == null) {
+                        LOG.warn(fieldName + " field does not have format 
@<FieldName>, ignored");
+                        continue;
+                    }
+                }
+
+                String value = ae.getValue();
+                ComparisonOperator op = ae.getOp();
+                TokenType keyType = ae.getKeyType();
+                TokenType valueType = ae.getValueType();
+                QualifierFilterEntity entry = new 
QualifierFilterEntity(fieldName, value, op, keyType,
+                                                                        
valueType);
+
+                // TODO Exact match, need to add escape for those special 
characters here, including:
+                // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", 
"\\", "^", "$", "|"
+
+                if (keyType == TokenType.ID && isTag(fieldName)) {
+                    if ((ComparisonOperator.EQUAL.equals(op) || 
ComparisonOperator.IS.equals(op))
+                        && !TokenType.NULL.equals(valueType)) {
+                        // Use RowFilter for equal TAG
+                        if (tagFilters.get(fieldName) == null) {
+                            tagFilters.put(fieldName, new ArrayList<String>());
+                        }
+                        tagFilters.get(fieldName).add(value);
+                    } else if (rowFilterPriority && 
ComparisonOperator.IN.equals(op)) {
+                        // Use RowFilter here by default
+                        if (tagFilters.get(fieldName) == null) {
+                            tagFilters.put(fieldName, new ArrayList<String>());
+                        }
+                        
tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value));
+                    } else if (ComparisonOperator.LIKE.equals(op) || 
ComparisonOperator.NOT_LIKE.equals(op)
+                               || ComparisonOperator.CONTAINS.equals(op)
+                               || ComparisonOperator.NOT_CONTAINS.equals(op)
+                               || ComparisonOperator.IN.equals(op) || 
ComparisonOperator.IS.equals(op)
+                               || ComparisonOperator.IS_NOT.equals(op)
+                               || ComparisonOperator.NOT_EQUAL.equals(op)
+                               || ComparisonOperator.EQUAL.equals(op)
+                               || ComparisonOperator.NOT_IN.equals(op)) {
+                        qualifierFilters.add(entry);
+                    } else {
+                        LOG.warn("Don't support operation: \"" + op + "\" on 
tag field: " + fieldName
+                                 + " yet, going to ignore");
+                        throw new IllegalArgumentException("Don't support 
operation: " + op
+                                                           + " on tag field: " 
+ fieldName
+                                                           + ", avaliable 
options: =, =!, =~, !=~, in, not in, contains, not contains");
+                    }
+                } else {
+                    qualifierFilters.add(entry);
+                }
+            }
+
+            // Build RowFilter for equal tags
+            list.addFilter(buildTagFilter(tagFilters));
+
+            // Build SingleColumnValueFilter
+            FilterList qualifierFilterList = 
buildQualifierFilter(qualifierFilters);
+            if (qualifierFilterList != null && 
qualifierFilterList.getFilters().size() > 0) {
+                list.addFilter(qualifierFilterList);
+            } else {
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("Ignore empty qualifier filter from " + 
qualifierFilters.toString());
+                }
+            }
+            fltList.addFilter(list);
+        }
+        LOG.info("Query: " + orExpr.toString() + " => Filter: " + 
fltList.toString());
+        return fltList;
+    }
+
+    /**
+     * charset is used to decode the byte array, in hbase server, 
RegexStringComparator uses the same charset
+     * to decode the byte array stored in qualifier for tag filter regex, it's 
always ISO-8859-1 as it only
+     * comes from String's hashcode (Integer) Note: regex comparasion is to 
compare String
+     */
+    protected Filter buildTagFilter(Map<String, List<String>> tagFilters) {
+        RegexStringComparator regexStringComparator = new 
RegexStringComparator(buildTagFilterRegex(tagFilters));
+        regexStringComparator.setCharset(charset);
+        RowFilter filter = new RowFilter(CompareOp.EQUAL, 
regexStringComparator);
+        return filter;
+    }
+
+    /**
+     * all qualifiers' condition must be satisfied.
+     * <H1>Use RegexStringComparator for:</H1> IN LIKE NOT_LIKE
+     * <H1>Use SubstringComparator for:</H1> CONTAINS
+     * <H1>Use EntityQualifierHelper for:</H1> EQUALS NOT_EUQALS LESS 
LESS_OR_EQUAL GREATER GREATER_OR_EQUAL
+     * <H2>TODO: Compare performance of RegexStringComparator 
,SubstringComparator ,EntityQualifierHelper</H2>
+     *
+     * @param qualifierFilters
+     * @return
+     */
+    protected FilterList buildQualifierFilter(List<QualifierFilterEntity> 
qualifierFilters) {
+        FilterList list = new FilterList(Operator.MUST_PASS_ALL);
+        // iterate all the qualifiers
+        for (QualifierFilterEntity entry : qualifierFilters) {
+            // if contains expression based filter
+            if (entry.getKeyType() == TokenType.EXP || entry.getValueType() == 
TokenType.EXP
+                || entry.getKeyType() != TokenType.ID) {
+                if (!EagleConfigFactory.load().isCoprocessorEnabled()) {
+                    LOG.warn("Expression in filter may not support, because 
custom filter and coprocessor is disabled: "
+                             + entry.toString());
+                }
+                list.addFilter(buildExpressionBasedFilter(entry));
+                continue;
+            }
+
+            // else using SingleColumnValueFilter
+            String qualifierName = entry.getKey();
+            if (!isTag(entry.getKey())) {
+                Qualifier qualifier = 
ed.getDisplayNameMap().get(entry.getKey());
+                qualifierName = qualifier.getQualifierName();
+            }
+
+            // Comparator to be used for building HBase Filter
+            // WritableByteArrayComparable comparator;
             ByteArrayComparable comparable;
-                       if(ComparisonOperator.IN.equals(entry.getOp())
-                               || 
ComparisonOperator.NOT_IN.equals(entry.getOp())){
-                               Filter setFilter = 
buildListQualifierFilter(entry);
-                               if(setFilter!=null){
-                                       list.addFilter(setFilter);
-                               }
-                       }else{
-                               // If [=,!=,is,is not] NULL, use NullComparator 
else throw exception
-                               if(TokenType.NULL.equals(entry.getValueType())){
-                                       
if(ComparisonOperator.EQUAL.equals(entry.getOp())
-                                               
||ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
-                                               
||ComparisonOperator.IS.equals(entry.getOp())
-                                               
||ComparisonOperator.IS_NOT.equals(entry.getOp()))
+            if (ComparisonOperator.IN.equals(entry.getOp())
+                || ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+                Filter setFilter = buildListQualifierFilter(entry);
+                if (setFilter != null) {
+                    list.addFilter(setFilter);
+                }
+            } else {
+                // If [=,!=,is,is not] NULL, use NullComparator else throw 
exception
+                if (TokenType.NULL.equals(entry.getValueType())) {
+                    if (ComparisonOperator.EQUAL.equals(entry.getOp())
+                        || ComparisonOperator.NOT_EQUAL.equals(entry.getOp())
+                        || ComparisonOperator.IS.equals(entry.getOp())
+                        || ComparisonOperator.IS_NOT.equals(entry.getOp())) {
                         comparable = new NullComparator();
-                                       else
-                                               throw new 
IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not 
supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] 
null|NULL");
-                               }
-                               // If [contains, not contains],use 
SubstringComparator
-                               else if 
(ComparisonOperator.CONTAINS.equals(entry.getOp())
-                                       || 
ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    } else {
+                        throw new IllegalArgumentException("Operation: " + 
entry.getOp()
+                                                           + " with NULL is 
not supported yet: "
+                                                           + entry.toString()
+                                                           + ", avaliable 
options: [=, !=, is, is not] null|NULL");
+                    }
+                } else if (ComparisonOperator.CONTAINS.equals(entry.getOp())
+                         || 
ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) {
+                    // If [contains, not contains],use SubstringComparator
                     comparable = new SubstringComparator(entry.getValue());
-                               }
-                               // If [like, not like], use 
RegexStringComparator
-                               else if 
(ComparisonOperator.LIKE.equals(entry.getOp())
-                                               || 
ComparisonOperator.NOT_LIKE.equals(entry.getOp())){
-                                       // Use RegexStringComparator for LIKE / 
NOT_LIKE
-                                       RegexStringComparator _comparator = new 
RegexStringComparator(buildQualifierRegex(entry.getValue()));
-                                       _comparator.setCharset(_charset);
+                } else if (ComparisonOperator.LIKE.equals(entry.getOp())
+                         || ComparisonOperator.NOT_LIKE.equals(entry.getOp())) 
{
+                    // If [like, not like], use RegexStringComparator
+                    // Use RegexStringComparator for LIKE / NOT_LIKE
+                    RegexStringComparator _comparator = new 
RegexStringComparator(buildQualifierRegex(entry
+                        .getValue()));
+                    _comparator.setCharset(charset);
                     comparable = _comparator;
-                               } else{
-                                       Class type = 
EntityQualifierUtils.getType(_ed, entry.getKey());
-                                       // if type is null (is Tag or not 
found) or not defined for TypedByteArrayComparator
-                                       
if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || 
TypedByteArrayComparator.get(type) == null){
-                        comparable = new 
BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), 
entry.getValue()));
-                                       }else {
-                        comparable = new 
TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), 
entry.getValue()),type);
-                                       }
-                               }
-
-                               SingleColumnValueFilter filter =
-                                               new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable);
-                               filter.setFilterIfMissing(_filterIfMissing);
-                               list.addFilter(filter);
-                       }
-               }
-
-               return list;
-       }
-
-       private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
-               BooleanExpressionComparator expressionComparator  = new 
BooleanExpressionComparator(entry,_ed);
-               _filterFields = expressionComparator.getRequiredFields();
-               RowValueFilter filter = new 
RowValueFilter(expressionComparator);
-               return filter;
-       }
-
-       /**
-        * Currently use BinaryComparator only
-        * <h2>TODO: </h2>
-        * Possibility to tune performance by using: OR[BinaryComparator,...] 
instead of RegexStringComparator?
-        *
-        *<br/> <br/>
-        *
-        * ! Check op must be IN or NOTIN in caller
-        *
-        * @param entry
-        * @return
-        */
-       private Filter buildListQualifierFilter(QualifierFilterEntity entry){
-               List<String> valueSet = 
EntityQualifierUtils.parseList(entry.getValue());
-               Iterator<String> it = valueSet.iterator();
-               String fieldName = entry.getKey();
-               String qualifierName = fieldName;
-               if(!_ed.isTag(entry.getKey())){
-                       qualifierName = 
_ed.getDisplayNameMap().get(entry.getKey()).getQualifierName();
-               }
-
-// TODO: Try to use RegExp just work if possible
-// Because single SingleColumnValueFilter is much faster than multi 
SingleColumnValueFilters in OR list.
-//             Class qualifierType = 
EntityQualifierHelper.getType(_ed,fieldName);
-//             if( qualifierType == null || qualifierType == String.class){
-//                     boolean first = true;
-//                     StringBuilder filterRegex = new StringBuilder();
-//                     filterRegex.append("^(");
-//                     while(it.hasNext()) {
-//                             String value = it.next();
-//                             if(value == null) {
-//                                     logger.warn("ignore empty value in set 
qualifier filter: "+entry.toString());
-//                                     continue;
-//                             }
-//                             if(!first) filterRegex.append("|");
-//                             filterRegex.append(value);
-//                             first = false;
-//                     }
-//                     filterRegex.append(")$");
-//                     RegexStringComparator regexStringComparator = new 
RegexStringComparator(filterRegex.toString());
-//                     return new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(),
-//                                     convertToHBaseCompareOp(entry.getOp()), 
regexStringComparator);
-//             }else{
-               FilterList setFilterList;
-               if(ComparisonOperator.IN.equals(entry.getOp())){
-                       setFilterList = new FilterList(Operator.MUST_PASS_ONE);
-               }else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) {
-                       setFilterList = new FilterList(Operator.MUST_PASS_ALL);
-               }else{
-                       throw new IllegalArgumentException(String.format("Don't 
support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN 
[LIST]",entry.getOp(),entry.toString()));
-               }
-
-               while(it.hasNext()) {
-                       String value = it.next();
-                       BinaryComparator comparator = new 
BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value));
-                       SingleColumnValueFilter filter =
-                                       new 
SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator);
-                       filter.setFilterIfMissing(_filterIfMissing);
-                       setFilterList.addFilter(filter);
-               }
-
-               return setFilterList;
-//             }
-       }
-
-       /**
-        * Just used for LIKE and NOT_LIKE
-        *
-        * @param qualifierValue
-        * @return
-        */
-       protected String buildQualifierRegex(String qualifierValue){
-               StringBuilder sb = new StringBuilder();
-//             sb.append("(?s)");
-               sb.append("^");
-               sb.append(qualifierValue);
-               sb.append("$");
-               return sb.toString();
-       }
-       
-         /**
-          * Appends the given ID to the given buffer, followed by "\\E".
-          * [steal it from opentsdb, thanks opentsdb :) 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java]
-          */
-         private static void addId(final StringBuilder buf, final byte[] id) {
-               buf.append("\\Q");
-           boolean backslash = false;
-           for (final byte b : id) {
-             buf.append((char) (b & 0xFF));
-             if (b == 'E' && backslash) {  // If we saw a `\' and now we have 
a `E'.
-               // So we just terminated the quoted section because we just 
added \E
-               // to `buf'.  So let's put a litteral \E now and start quoting 
again.
-               buf.append("\\\\E\\Q");
-             } else {
-               backslash = b == '\\';
-             }
-           }
-           buf.append("\\E");
-         }
-         
-         @SuppressWarnings("unused")
-         private static void addId(final StringBuilder buf, final String id) {
-                   buf.append("\\Q");
-                       int len = id.length()-1;
-                   boolean backslash = false;
-                   for (int i =0; i < len; i++) {
-                     char c = id.charAt(i);
-                     buf.append(c);
-                     if (c == 'E' && backslash) {  // If we saw a `\' and now 
we have a `E'.
-                       // So we just terminated the quoted section because we 
just added \E
-                       // to `buf'.  So let's put a litteral \E now and start 
quoting again.
-                       buf.append("\\\\E\\Q");
-                     } else {
-                       backslash = c == '\\';
-                     }
-                   }
-                   buf.append("\\E");
-                 }
-       
-       /**
-        * one search tag may have multiple values which have OR relationship, 
and relationship between
-        * different search tags is AND
-        * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2"
-        * @param tags
-        * @return
-        */
-       protected String buildTagFilterRegex(Map<String, List<String>> tags){
-               // TODO need consider that \E could be part of tag, refer to 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java
-               final SortedMap<Integer, List<Integer>> tagHash = new 
TreeMap<Integer, List<Integer>>();
-               final int numOfPartitionFields = (_ed.getPartitions() == null) 
? 0 : _ed.getPartitions().length;
-               for(Map.Entry<String, List<String>> entry : tags.entrySet()){
-                       String tagName = entry.getKey();
-                       // Ignore tag if the tag is one of partition fields
-                       if (_ed.isPartitionTag(tagName)) {
-                               continue;
-                       }
-                       List<String> stringValues = entry.getValue();
-                       List<Integer> hashValues = new 
ArrayList<Integer>(stringValues.size());
-                       for(String value : stringValues){
-                               hashValues.add(value.hashCode());
-                       }
-                       tagHash.put(tagName.hashCode(), hashValues);
-               }
-               
-               // header = prefix(4 bytes) + partition_hashes(4*N bytes) + 
timestamp (8 bytes)
-               final int headerLength = 4 + numOfPartitionFields * 4 + 8;
-
-               // <tag1:4><value1:4> ... <tagn:4><valuen:4>
-               StringBuilder sb = new StringBuilder();
-               sb.append("(?s)");
-               sb.append("^(?:.{").append(headerLength).append("})");
-               sb.append("(?:.{").append(8).append("})*"); // for any number 
of tags
-               for (Map.Entry<Integer, List<Integer>> entry : 
tagHash.entrySet()) {
-                       try {
-                               addId(sb, ByteUtil.intToBytes(entry.getKey()));
-                               List<Integer> hashValues = entry.getValue();
-                               sb.append("(?:");
-                               boolean first = true;
-                               for(Integer value : hashValues){
-                                       if(!first){
-                                               sb.append('|');
-                                       }
-                                       addId(sb, ByteUtil.intToBytes(value));
-                                       first = false;
-                               }
-                               sb.append(")");
-                               sb.append("(?:.{").append(8).append("})*"); // 
for any number of tags
-                       } catch (Exception ex) {
-                               LOG.error("constructing regex error", ex);
-                       }
-               }
-               sb.append("$");
-               if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + 
sb.toString());
-               return sb.toString();
-       }
-
-       /**
-        * Convert ComparisonOperator to native HBase CompareOp
-        *
-        * Support:
-        *      =, =~,CONTAINS,<,<=,>,>=,!=,!=~
-        *
-        * @param comp
-        * @return
-        */
-       protected static CompareOp convertToHBaseCompareOp(ComparisonOperator 
comp) {
-               if(comp == ComparisonOperator.EQUAL || comp == 
ComparisonOperator.LIKE
-                               || comp == ComparisonOperator.CONTAINS
-                               || comp == ComparisonOperator.IN
-                               || comp == ComparisonOperator.IS
-                               ) {
-                       return CompareOp.EQUAL;
-               }else if(comp == ComparisonOperator.LESS) {
-                       return CompareOp.LESS;
-               } else if(comp == ComparisonOperator.LESS_OR_EQUAL){
-                       return CompareOp.LESS_OR_EQUAL;
-               }else if(comp == ComparisonOperator.GREATER) {
-                       return CompareOp.GREATER;
-               } else if(comp == ComparisonOperator.GREATER_OR_EQUAL){
-                       return CompareOp.GREATER_OR_EQUAL;
-               } else if(comp == ComparisonOperator.NOT_EQUAL
-                               || comp == ComparisonOperator.NOT_LIKE
-                               || comp == ComparisonOperator.NOT_CONTAINS
-                               || comp == ComparisonOperator.IS_NOT
-                               || comp == ComparisonOperator.NOT_IN)
-               {
-                       return CompareOp.NOT_EQUAL;
-               } else {
-                       LOG.error("{} operation is not supported now\n", comp);
-                       throw new IllegalArgumentException("Illegal operation: 
"+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values()));
-               }
-       }
-
-       protected static CompareOp getHBaseCompareOp(String comp) {
-               return 
convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp));
-       }
+                } else {
+                    Class type = EntityQualifierUtils.getType(ed, 
entry.getKey());
+                    // if type is null (is Tag or not found) or not defined 
for TypedByteArrayComparator
+                    if (!EagleConfigFactory.load().isCoprocessorEnabled() || 
type == null
+                        || TypedByteArrayComparator.get(type) == null) {
+                        comparable = new 
BinaryComparator(EntityQualifierUtils.toBytes(ed, entry.getKey(),
+                                                                               
        entry.getValue()));
+                    } else {
+                        comparable = new 
TypedByteArrayComparator(EntityQualifierUtils
+                            .toBytes(ed, entry.getKey(), entry.getValue()), 
type);
+                    }
+                }
+
+                SingleColumnValueFilter filter = new 
SingleColumnValueFilter(ed.getColumnFamily()
+                    .getBytes(), qualifierName.getBytes(), 
convertToHBaseCompareOp(entry.getOp()),
+                                                                             
comparable);
+                filter.setFilterIfMissing(filterIfMissing);
+                list.addFilter(filter);
+            }
+        }
+
+        return list;
+    }
+
+    private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) {
+        BooleanExpressionComparator expressionComparator = new 
BooleanExpressionComparator(entry, ed);
+        filterFields = expressionComparator.getRequiredFields();
+        RowValueFilter filter = new RowValueFilter(expressionComparator);
+        return filter;
+    }
+
+    /**
+     * Currently use BinaryComparator only
+     * <h2>TODO:</h2> Possibility to tune performance by using: 
OR[BinaryComparator,...] instead of
+     * RegexStringComparator? <br/>
+     * <br/>
+     * ! Check op must be IN or NOTIN in caller
+     *
+     * @param entry
+     * @return
+     */
+    private Filter buildListQualifierFilter(QualifierFilterEntity entry) {
+        List<String> valueSet = 
EntityQualifierUtils.parseList(entry.getValue());
+        Iterator<String> it = valueSet.iterator();
+        String fieldName = entry.getKey();
+        String qualifierName = fieldName;
+        if (!ed.isTag(entry.getKey())) {
+            qualifierName = 
ed.getDisplayNameMap().get(entry.getKey()).getQualifierName();
+        }
+
+        // TODO: Try to use RegExp just work if possible
+        // Because single SingleColumnValueFilter is much faster than multi 
SingleColumnValueFilters in OR
+        // list.
+        // Class qualifierType = EntityQualifierHelper.getType(ed,fieldName);
+        // if( qualifierType == null || qualifierType == String.class){
+        // boolean first = true;
+        // StringBuilder filterRegex = new StringBuilder();
+        // filterRegex.append("^(");
+        // while(it.hasNext()) {
+        // String value = it.next();
+        // if(value == null) {
+        // logger.warn("ignore empty value in set qualifier filter: 
"+entry.toString());
+        // continue;
+        // }
+        // if(!first) filterRegex.append("|");
+        // filterRegex.append(value);
+        // first = false;
+        // }
+        // filterRegex.append(")$");
+        // RegexStringComparator regexStringComparator = new 
RegexStringComparator(filterRegex.toString());
+        // return new SingleColumnValueFilter(ed.getColumnFamily().getBytes(), 
qualifierName.getBytes(),
+        // convertToHBaseCompareOp(entry.getOp()), regexStringComparator);
+        // }else{
+        FilterList setFilterList;
+        if (ComparisonOperator.IN.equals(entry.getOp())) {
+            setFilterList = new FilterList(Operator.MUST_PASS_ONE);
+        } else if (ComparisonOperator.NOT_IN.equals(entry.getOp())) {
+            setFilterList = new FilterList(Operator.MUST_PASS_ALL);
+        } else {
+            throw new IllegalArgumentException(String
+                .format("Don't support operation: %s on LIST type of value 
yet: %s, valid options: IN/NOT IN [LIST]",
+                        entry.getOp(), entry.toString()));
+        }
+
+        while (it.hasNext()) {
+            String value = it.next();
+            BinaryComparator comparator = new 
BinaryComparator(EntityQualifierUtils.toBytes(ed, fieldName,
+                                                                               
             value));
+            SingleColumnValueFilter filter = new 
SingleColumnValueFilter(ed.getColumnFamily()
+                .getBytes(), qualifierName.getBytes(), 
convertToHBaseCompareOp(entry.getOp()), comparator);
+            filter.setFilterIfMissing(filterIfMissing);
+            setFilterList.addFilter(filter);
+        }
+
+        return setFilterList;
+        // }
+    }
+
+    /**
+     * Just used for LIKE and NOT_LIKE
+     *
+     * @param qualifierValue
+     * @return
+     */
+    protected String buildQualifierRegex(String qualifierValue) {
+        StringBuilder sb = new StringBuilder();
+        // sb.append("(?s)");
+        sb.append("^");
+        sb.append(qualifierValue);
+        sb.append("$");
+        return sb.toString();
+    }
+
+    /**
+     * Appends the given ID to the given buffer, followed by "\\E". [steal it 
from opentsdb, thanks opentsdb
+     * :) 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java]
+     */
+    private static void addId(final StringBuilder buf, final byte[] id) {
+        buf.append("\\Q");
+        boolean backslash = false;
+        for (final byte b : id) {
+            buf.append((char)(b & 0xFF));
+            if (b == 'E' && backslash) { // If we saw a `\' and now we have a 
`E'.
+                // So we just terminated the quoted section because we just 
added \E
+                // to `buf'. So let's put a litteral \E now and start quoting 
again.
+                buf.append("\\\\E\\Q");
+            } else {
+                backslash = b == '\\';
+            }
+        }
+        buf.append("\\E");
+    }
+
+    @SuppressWarnings("unused")
+    private static void addId(final StringBuilder buf, final String id) {
+        buf.append("\\Q");
+        int len = id.length() - 1;
+        boolean backslash = false;
+        for (int i = 0; i < len; i++) {
+            char c = id.charAt(i);
+            buf.append(c);
+            if (c == 'E' && backslash) { // If we saw a `\' and now we have a 
`E'.
+                // So we just terminated the quoted section because we just 
added \E
+                // to `buf'. So let's put a litteral \E now and start quoting 
again.
+                buf.append("\\\\E\\Q");
+            } else {
+                backslash = c == '\\';
+            }
+        }
+        buf.append("\\E");
+    }
+
+    /**
+     * one search tag may have multiple values which have OR relationship, and 
relationship between different
+     * search tags is AND the query is like "(TAG1=value11 OR TAG1=value12) 
AND TAG2=value2"
+     *
+     * @param tags
+     * @return
+     */
+    protected String buildTagFilterRegex(Map<String, List<String>> tags) {
+        // TODO need consider that \E could be part of tag, refer to
+        // 
https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java
+        final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, 
List<Integer>>();
+        final int numOfPartitionFields = (ed.getPartitions() == null) ? 0 : 
ed.getPartitions().length;
+        for (Map.Entry<String, List<String>> entry : tags.entrySet()) {
+            String tagName = entry.getKey();
+            // Ignore tag if the tag is one of partition fields
+            if (ed.isPartitionTag(tagName)) {
+                continue;
+            }
+            List<String> stringValues = entry.getValue();
+            List<Integer> hashValues = new 
ArrayList<Integer>(stringValues.size());
+            for (String value : stringValues) {
+                hashValues.add(value.hashCode());
+            }
+            tagHash.put(tagName.hashCode(), hashValues);
+        }
+
+        // header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp 
(8 bytes)
+        final int headerLength = 4 + numOfPartitionFields * 4 + 8;
+
+        // <tag1:4><value1:4> ... <tagn:4><valuen:4>
+        StringBuilder sb = new StringBuilder();
+        sb.append("(?s)");
+        sb.append("^(?:.{").append(headerLength).append("})");
+        sb.append("(?:.{").append(8).append("})*"); // for any number of tags
+        for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) {
+            try {
+                addId(sb, ByteUtil.intToBytes(entry.getKey()));
+                List<Integer> hashValues = entry.getValue();
+                sb.append("(?:");
+                boolean first = true;
+                for (Integer value : hashValues) {
+                    if (!first) {
+                        sb.append('|');
+                    }
+                    addId(sb, ByteUtil.intToBytes(value));
+                    first = false;
+                }
+                sb.append(")");
+                sb.append("(?:.{").append(8).append("})*"); // for any number 
of tags
+            } catch (Exception ex) {
+                LOG.error("constructing regex error", ex);
+            }
+        }
+        sb.append("$");
+        if (LOG.isDebugEnabled()) {
+            LOG.debug("Tag filter pattern is " + sb.toString());
+        }
+        return sb.toString();
+    }
+
+    /**
+     * Convert ComparisonOperator to native HBase CompareOp Support: =, 
=~,CONTAINS,<,<=,>,>=,!=,!=~
+     *
+     * @param comp
+     * @return
+     */
+    protected static CompareOp convertToHBaseCompareOp(ComparisonOperator 
comp) {
+        if (comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE
+            || comp == ComparisonOperator.CONTAINS || comp == 
ComparisonOperator.IN
+            || comp == ComparisonOperator.IS) {
+            return CompareOp.EQUAL;
+        } else if (comp == ComparisonOperator.LESS) {
+            return CompareOp.LESS;
+        } else if (comp == ComparisonOperator.LESS_OR_EQUAL) {
+            return CompareOp.LESS_OR_EQUAL;
+        } else if (comp == ComparisonOperator.GREATER) {
+            return CompareOp.GREATER;
+        } else if (comp == ComparisonOperator.GREATER_OR_EQUAL) {
+            return CompareOp.GREATER_OR_EQUAL;
+        } else if (comp == ComparisonOperator.NOT_EQUAL || comp == 
ComparisonOperator.NOT_LIKE
+                   || comp == ComparisonOperator.NOT_CONTAINS || comp == 
ComparisonOperator.IS_NOT
+                   || comp == ComparisonOperator.NOT_IN) {
+            return CompareOp.NOT_EQUAL;
+        } else {
+            LOG.error("{} operation is not supported now\n", comp);
+            throw new IllegalArgumentException("Illegal operation: " + comp + 
", avaliable options: "
+                                               + 
Arrays.toString(ComparisonOperator.values()));
+        }
+    }
+
+    protected static CompareOp getHBaseCompareOp(String comp) {
+        return 
convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp));
+    }
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
index 6cdc77b..340c33b 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java
@@ -24,82 +24,86 @@ import java.io.DataInput;
 import java.io.DataOutput;
 import java.io.IOException;
 
-public class QualifierFilterEntity implements Writable{
-       public String key;
-       public String value;
-       public ComparisonOperator op;
-       public TokenType valueType;
-       public TokenType keyType;
-
-       public QualifierFilterEntity(){}
-       public QualifierFilterEntity(String key, String value, 
ComparisonOperator comp, TokenType keyType, TokenType valueType) {
-               super();
-               this.key = key;
-               this.value = value;
-               this.op = comp;
-               this.keyType = keyType;
-               this.valueType = valueType;
-       }
-
-       public String getKey() {
-               return key;
-       }
-
-       public void setKey(String key) {
-               this.key = key;
-       }
-
-       public String getValue() {
-               return value;
-       }
-
-       public void setValue(String value) {
-               this.value = value;
-       }
-
-       public ComparisonOperator getOp() {
-               return op;
-       }
-
-       public void setOp(ComparisonOperator op) {
-               this.op = op;
-       }
-
-       public TokenType getValueType() {
-               return valueType;
-       }
-
-       public void setValueType(TokenType valueType) {
-               this.valueType = valueType;
-       }
-
-       public void setKeyType(TokenType keyType){
-               this.keyType = keyType;
-       }
-       public TokenType getKeyType(){
-               return this.keyType;
-       }
-
-       @Override
-       public String toString() {
-               return String.format("%s %s %s",this.key,this.op,this.value);
-       }
-
-       @Override
-       public void write(DataOutput out) throws IOException {
-               out.writeUTF(this.key);
-               out.writeUTF(this.getValue());
-               out.writeUTF(this.op.name());
-               out.writeUTF(this.keyType.name());
-               out.writeUTF(this.valueType.name());
-       }
-
-       @Override
-       public void readFields(DataInput in) throws IOException {
-               this.key = in.readUTF();
-               this.value = in.readUTF();
-               this.op = ComparisonOperator.valueOf(in.readUTF());
-               this.keyType = TokenType.valueOf(in.readUTF());
-               this.valueType = TokenType.valueOf(in.readUTF());
-       }
-}
\ No newline at end of file
+public class QualifierFilterEntity implements Writable {
+    public String key;
+    public String value;
+    public ComparisonOperator op;
+    public TokenType valueType;
+    public TokenType keyType;
+
+    public QualifierFilterEntity() {
+    }
+
+    public QualifierFilterEntity(String key, String value, ComparisonOperator 
comp, TokenType keyType,
+                                 TokenType valueType) {
+        super();
+        this.key = key;
+        this.value = value;
+        this.op = comp;
+        this.keyType = keyType;
+        this.valueType = valueType;
+    }
+
+    public String getKey() {
+        return key;
+    }
+
+    public void setKey(String key) {
+        this.key = key;
+    }
+
+    public String getValue() {
+        return value;
+    }
+
+    public void setValue(String value) {
+        this.value = value;
+    }
+
+    public ComparisonOperator getOp() {
+        return op;
+    }
+
+    public void setOp(ComparisonOperator op) {
+        this.op = op;
+    }
+
+    public TokenType getValueType() {
+        return valueType;
+    }
+
+    public void setValueType(TokenType valueType) {
+        this.valueType = valueType;
+    }
+
+    public void setKeyType(TokenType keyType) {
+        this.keyType = keyType;
+    }
+
+    public TokenType getKeyType() {
+        return this.keyType;
+    }
+
+    @Override
+    public String toString() {
+        return String.format("%s %s %s", this.key, this.op, this.value);
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        out.writeUTF(this.key);
+        out.writeUTF(this.getValue());
+        out.writeUTF(this.op.name());
+        out.writeUTF(this.keyType.name());
+        out.writeUTF(this.valueType.name());
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        this.key = in.readUTF();
+        this.value = in.readUTF();
+        this.op = ComparisonOperator.valueOf(in.readUTF());
+        this.keyType = TokenType.valueOf(in.readUTF());
+        this.valueType = TokenType.valueOf(in.readUTF());
+    }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
index a4b97ea..91a6939 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java
@@ -33,32 +33,34 @@ import java.io.IOException;
 import java.util.List;
 
 /**
- * TODO: Critical performance problem!!!
- * TODO: Refactor to specified multi-column filter so that avoid return all 
qualifier columns from region server to client side
+ * TODO: Critical performance problem!!! TODO: Refactor to specified 
multi-column filter so that avoid return
+ * all qualifier columns from region server to client side
  *
  * @since 2014/11/17
  */
 public class RowValueFilter extends FilterBase {
-    private final static Logger LOG = 
LoggerFactory.getLogger(RowValueFilter.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(RowValueFilter.class);
     private boolean filterOutRow = false;
     private WritableComparable<List<KeyValue>> comparator;
 
     // TODO: Use qualifiers to reduce network tranfer
-//    private List<byte[]> qualifiers;
-    public RowValueFilter(){}
+    // private List<byte[]> qualifiers;
+    public RowValueFilter() {
+    }
 
     /**
      * Filter out row if WritableComparable.compareTo return 0
+     *
      * @param comparator <code>WritableComparable[List[KeyValue]]</code>
      */
-    public RowValueFilter(WritableComparable<List<KeyValue>> comparator){
+    public RowValueFilter(WritableComparable<List<KeyValue>> comparator) {
         this.comparator = comparator;
     }
 
-//    public RowValueFilter(List<byte[]> 
qualifiers,WritableComparable<List<KeyValue>> comparator){
-//        this.qualifiers = qualifiers;
-//        this.comparator = comparator;
-//    }
+    // public RowValueFilter(List<byte[]> 
qualifiers,WritableComparable<List<KeyValue>> comparator){
+    // this.qualifiers = qualifiers;
+    // this.comparator = comparator;
+    // }
 
     /**
      * Old interface in hbase-0.94
@@ -77,7 +79,7 @@ public class RowValueFilter extends FilterBase {
      * @param in
      * @throws IOException
      */
-//    @Override
+    // @Override
     @Deprecated
     public void readFields(DataInput in) throws IOException {
         this.comparator = new BooleanExpressionComparator();
@@ -85,7 +87,8 @@ public class RowValueFilter extends FilterBase {
     }
 
     /**
-     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff based
+     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff
+     * based
      *
      * @return
      * @throws IOException
@@ -98,23 +101,24 @@ public class RowValueFilter extends FilterBase {
     }
 
     /**
-     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff based
+     * TODO: Currently still use older serialization method from hbase-0.94, 
need to migrate into ProtoBuff
+     * based
      */
     // Override static method
-    public static Filter parseFrom(final byte [] pbBytes) throws 
DeserializationException {
+    public static Filter parseFrom(final byte[] pbBytes) throws 
DeserializationException {
         ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(pbBytes);
         RowValueFilter filter = new RowValueFilter();
         try {
             filter.readFields(byteArrayDataInput);
         } catch (IOException e) {
-            LOG.error("Got error to deserialize RowValueFilter from PB 
bytes",e);
+            LOG.error("Got error to deserialize RowValueFilter from PB bytes", 
e);
             throw new DeserializationException(e);
         }
         return filter;
     }
 
     @Override
-    public boolean hasFilterRow(){
+    public boolean hasFilterRow() {
         return true;
     }
 
@@ -124,21 +128,21 @@ public class RowValueFilter extends FilterBase {
     }
 
     @Override
-    public void reset() {
-        this.filterOutRow = false;
+    public boolean filterRow() {
+        return filterOutRow;
     }
 
     @Override
-    public boolean filterRow(){
-        return filterOutRow;
+    public void reset() {
+        this.filterOutRow = false;
     }
 
     @Override
     public String toString() {
-        return super.toString()+" ( "+this.comparator.toString()+" )";
+        return super.toString() + " ( " + this.comparator.toString() + " )";
     }
 
-//    public List<byte[]> getQualifiers() {
-//        return qualifiers;
-//    }
-}
\ No newline at end of file
+    // public List<byte[]> getQualifiers() {
+    // return qualifiers;
+    // }
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
index ecaf8cc..74a13c1 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java
@@ -32,15 +32,12 @@ import java.util.HashMap;
 import java.util.Map;
 
 /**
- * <h1>TypedByteArrayComparator</h1>
- *
- * Compare byte array: <code>byte[] value</code> with class type: <code>Class 
type</code>
- *
- * <br/>
+ * <h1>TypedByteArrayComparator</h1> Compare byte array: <code>byte[] 
value</code> with class type:
+ * <code>Class type</code> <br/>
  * <br/>
  * Built-in support:
  *
- *  <pre>
+ * <pre>
  *    Double
  *    double
  *    Integer
@@ -51,14 +48,14 @@ import java.util.Map;
  *    short
  *    Boolean
  *    boolean
- *  </pre>
+ * </pre>
  *
- *  And can be extend by defining new {@link RawComparator} and register with  
{@link #define(Class type, RawComparator comparator)}
- * <br/>
+ * And can be extend by defining new {@link RawComparator} and register with
+ * {@link #define(Class type, RawComparator comparator)} <br/>
  * <br/>
  */
 public class TypedByteArrayComparator extends ByteArrayComparable {
-    private final static Logger LOG = 
LoggerFactory.getLogger(TypedByteArrayComparator.class);
+    private static final Logger LOG = 
LoggerFactory.getLogger(TypedByteArrayComparator.class);
 
     private Class type;
 
@@ -69,34 +66,38 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
      * Default constructor for writable
      */
     @SuppressWarnings("unused")
-    public TypedByteArrayComparator(){
+    public TypedByteArrayComparator() {
         super(null);
     }
 
-    public TypedByteArrayComparator(byte[] value, Class type){
+    public TypedByteArrayComparator(byte[] value, Class type) {
         super(value);
         this.type = type;
         this.comparator = get(this.type);
-        if(this.comparator == null) throw new IllegalArgumentException("No 
comparator found for class: "+type);
+        if (this.comparator == null) {
+            throw new IllegalArgumentException("No comparator found for class: 
" + type);
+        }
     }
 
     /**
      * @param in hbase-0.94 interface
      * @throws IOException
      */
-//    @Override
+    // @Override
     public void readFields(DataInput in) throws IOException {
-//        super.readFields(in);
+        // super.readFields(in);
         try {
             String _type = in.readUTF();
             type = _primitiveTypeClassMap.get(_type);
-            if(type == null) {
+            if (type == null) {
                 type = Class.forName(_type);
             }
             comparator = get(type);
-            if(comparator == null) throw new IllegalArgumentException("No 
comparator found for class: "+type);
+            if (comparator == null) {
+                throw new IllegalArgumentException("No comparator found for 
class: " + type);
+            }
         } catch (ClassNotFoundException e) {
-            throw new IOException(e.getMessage(),e);
+            throw new IOException(e.getMessage(), e);
         }
     }
 
@@ -104,9 +105,9 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
      * @param out hbase-0.94 interface
      * @throws IOException
      */
-//    @Override
+    // @Override
     public void write(DataOutput out) throws IOException {
-//        super.write(out);
+        // super.write(out);
         String typeName = type.getName();
         out.writeUTF(typeName);
     }
@@ -123,7 +124,7 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
             this.write(byteArrayDataOutput);
             return byteArrayDataOutput.toByteArray();
         } catch (IOException e) {
-            LOG.error("Failed to serialize due to: "+e.getMessage(),e);
+            LOG.error("Failed to serialize due to: " + e.getMessage(), e);
             throw new RuntimeException(e);
         }
     }
@@ -135,14 +136,13 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
      * @return Comparator instance
      * @throws DeserializationException
      */
-    public static TypedByteArrayComparator parseFrom(final byte [] bytes)
-            throws DeserializationException {
+    public static TypedByteArrayComparator parseFrom(final byte[] bytes) 
throws DeserializationException {
         TypedByteArrayComparator comparator = new TypedByteArrayComparator();
         ByteArrayDataInput byteArrayDataInput = 
ByteStreams.newDataInput(bytes);
         try {
             comparator.readFields(byteArrayDataInput);
         } catch (IOException e) {
-            LOG.error("Got error to deserialize TypedByteArrayComparator from 
PB bytes",e);
+            LOG.error("Got error to deserialize TypedByteArrayComparator from 
PB bytes", e);
             throw new DeserializationException(e);
         }
         return comparator;
@@ -158,33 +158,35 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
      * <li>Try registered comparator</li>
      * <li>If not found, try all possible WritableComparator</li>
      * </ol>
-     *
      * If not found finally, throw new IllegalArgumentException("unable to get 
comparator for class: "+type);
      *
      * @param type value type class
      * @return RawComparator
      */
-    public static RawComparator get(Class type){
+    public static RawComparator get(Class type) {
         RawComparator comparator = null;
         try {
             comparator = _typedClassComparator.get(type);
-        }catch (ClassCastException ex){
+        } catch (ClassCastException ex) {
             // ignore
         }
         try {
-            if (comparator == null) comparator = WritableComparator.get(type);
-        }catch (ClassCastException ex){
+            if (comparator == null) {
+                comparator = WritableComparator.get(type);
+            }
+        } catch (ClassCastException ex) {
             // ignore
         }
         return comparator;
     }
 
-    private final static Map<Class,RawComparator> _typedClassComparator = new 
HashMap<Class, RawComparator>();
-    public static void define(Class type, RawComparator comparator){
-        _typedClassComparator.put(type,comparator);
+    private static final Map<Class, RawComparator> _typedClassComparator = new 
HashMap<Class, RawComparator>();
+
+    public static void define(Class type, RawComparator comparator) {
+        _typedClassComparator.put(type, comparator);
     }
 
-    static{
+    static {
         define(Double.class, WritableComparator.get(DoubleWritable.class));
         define(double.class, WritableComparator.get(DoubleWritable.class));
         define(Integer.class, WritableComparator.get(IntWritable.class));
@@ -200,14 +202,15 @@ public class TypedByteArrayComparator extends 
ByteArrayComparable {
     /**
      * Because {@link Class#forName } can't find class for primitive type
      */
-    private final static Map<String,Class> _primitiveTypeClassMap = new 
HashMap<String, Class>();
+    private static final Map<String, Class> _primitiveTypeClassMap = new 
HashMap<String, Class>();
+
     static {
-        _primitiveTypeClassMap.put(int.class.getName(),int.class);
-        _primitiveTypeClassMap.put(double.class.getName(),double.class);
-        _primitiveTypeClassMap.put(long.class.getName(),long.class);
-        _primitiveTypeClassMap.put(short.class.getName(),short.class);
-        _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class);
-        _primitiveTypeClassMap.put(char.class.getName(),char.class);
-        _primitiveTypeClassMap.put(byte.class.getName(),byte.class);
+        _primitiveTypeClassMap.put(int.class.getName(), int.class);
+        _primitiveTypeClassMap.put(double.class.getName(), double.class);
+        _primitiveTypeClassMap.put(long.class.getName(), long.class);
+        _primitiveTypeClassMap.put(short.class.getName(), short.class);
+        _primitiveTypeClassMap.put(boolean.class.getName(), boolean.class);
+        _primitiveTypeClassMap.put(char.class.getName(), char.class);
+        _primitiveTypeClassMap.put(byte.class.getName(), byte.class);
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
index 418ab33..7a32077 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java
@@ -24,22 +24,24 @@ import 
org.apache.hadoop.hbase.filter.SingleColumnValueFilter;
 
 public abstract class IndexLogReader implements LogReader {
 
-       // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. 
More graceful implementation should use SingleColumnValueExcludeFilter, 
-       // but it's complicated in current implementation. 
-       protected static void workaroundHBASE2198(Get get, Filter 
filter,byte[][] qualifiers) {
-               if (filter instanceof SingleColumnValueFilter) {
-                       if(qualifiers == null) {
-                               get.addFamily(((SingleColumnValueFilter) 
filter).getFamily());
-                       }else{
-                               get.addColumn(((SingleColumnValueFilter) 
filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier());
-                       }
-                       return;
-               }
-               if (filter instanceof FilterList) {
-                       for (Filter f : ((FilterList)filter).getFilters()) {
-                               workaroundHBASE2198(get, f,qualifiers);
-                       }
-               }
-       }
+    // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. 
More graceful implementation should
+    // use SingleColumnValueExcludeFilter,
+    // but it's complicated in current implementation.
+    protected static void workaroundHBASE2198(Get get, Filter filter, byte[][] 
qualifiers) {
+        if (filter instanceof SingleColumnValueFilter) {
+            if (qualifiers == null) {
+                get.addFamily(((SingleColumnValueFilter)filter).getFamily());
+            } else {
+                get.addColumn(((SingleColumnValueFilter)filter).getFamily(),
+                              
((SingleColumnValueFilter)filter).getQualifier());
+            }
+            return;
+        }
+        if (filter instanceof FilterList) {
+            for (Filter f : ((FilterList)filter).getFilters()) {
+                workaroundHBASE2198(get, f, qualifiers);
+            }
+        }
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
index 9e059f2..579755f 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java
@@ -26,69 +26,69 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.List;
 
-public abstract class IndexStreamReader  extends StreamReader {
-       protected final IndexDefinition indexDef;
-       protected final SearchCondition condition;
-       protected final List<byte[]> indexRowkeys;
-       protected LogReader<InternalLog> reader;
-       protected long lastTimestamp = 0;
-       protected long firstTimestamp = 0;
-       
-       protected static final Logger LOG = 
LoggerFactory.getLogger(IndexStreamReader.class);
+public abstract class IndexStreamReader extends StreamReader {
+    protected final IndexDefinition indexDef;
+    protected final SearchCondition condition;
+    protected final List<byte[]> indexRowkeys;
+    protected LogReader<InternalLog> reader;
+    protected long lastTimestamp = 0;
+    protected long firstTimestamp = 0;
 
-       public IndexStreamReader(IndexDefinition indexDef, SearchCondition 
condition, List<byte[]> indexRowkeys) {
-               this.indexDef = indexDef;
-               this.condition = condition;
-               this.indexRowkeys = indexRowkeys;
-               this.reader = null;
-       }
+    protected static final Logger LOG = 
LoggerFactory.getLogger(IndexStreamReader.class);
 
-       @Override
-       public long getLastTimestamp() {
-               return lastTimestamp;
-       }
+    public IndexStreamReader(IndexDefinition indexDef, SearchCondition 
condition, List<byte[]> indexRowkeys) {
+        this.indexDef = indexDef;
+        this.condition = condition;
+        this.indexRowkeys = indexRowkeys;
+        this.reader = null;
+    }
 
-       @Override
-       public long getFirstTimestamp() {
-               return this.firstTimestamp;
-       }
+    @Override
+    public long getLastTimestamp() {
+        return lastTimestamp;
+    }
 
-       @Override
-       public void readAsStream() throws Exception {
-               if (reader == null) {
-                       reader = createIndexReader();
-               }
-               final EntityDefinition entityDef = 
indexDef.getEntityDefinition();
-               try{
-                       reader.open();
-                       InternalLog log;
-                       int count = 0;
-                       while ((log = reader.read()) != null) {
-                               TaggedLogAPIEntity entity = 
HBaseInternalLogHelper.buildEntity(log, entityDef);
-                               
entity.setSerializeAlias(condition.getOutputAlias());
-                               
entity.setSerializeVerbose(condition.isOutputVerbose());
+    @Override
+    public long getFirstTimestamp() {
+        return this.firstTimestamp;
+    }
 
-                               if (lastTimestamp == 0 || lastTimestamp < 
entity.getTimestamp()) {
-                                       lastTimestamp = entity.getTimestamp();
-                               }
-                               if(firstTimestamp == 0 || firstTimestamp > 
entity.getTimestamp()){
-                                       firstTimestamp = entity.getTimestamp();
-                               }
-                               for(EntityCreationListener l : _listeners){
-                                       l.entityCreated(entity);
-                               }
-                               if(++count == condition.getPageSize()) {
-                                       break;
-                               }
-                       }
-               }catch(IOException ioe){
-                       LOG.error("Fail reading log", ioe);
-                       throw ioe;
-               }finally{
-                       reader.close();
-               }               
-       }
+    @Override
+    public void readAsStream() throws Exception {
+        if (reader == null) {
+            reader = createIndexReader();
+        }
+        final EntityDefinition entityDef = indexDef.getEntityDefinition();
+        try {
+            reader.open();
+            InternalLog log;
+            int count = 0;
+            while ((log = reader.read()) != null) {
+                TaggedLogAPIEntity entity = 
HBaseInternalLogHelper.buildEntity(log, entityDef);
+                entity.setSerializeAlias(condition.getOutputAlias());
+                entity.setSerializeVerbose(condition.isOutputVerbose());
+
+                if (lastTimestamp == 0 || lastTimestamp < 
entity.getTimestamp()) {
+                    lastTimestamp = entity.getTimestamp();
+                }
+                if (firstTimestamp == 0 || firstTimestamp > 
entity.getTimestamp()) {
+                    firstTimestamp = entity.getTimestamp();
+                }
+                for (EntityCreationListener l : listeners) {
+                    l.entityCreated(entity);
+                }
+                if (++count == condition.getPageSize()) {
+                    break;
+                }
+            }
+        } catch (IOException ioe) {
+            LOG.error("Fail reading log", ioe);
+            throw ioe;
+        } finally {
+            reader.close();
+        }
+    }
+
+    protected abstract LogReader createIndexReader();
 
-       protected abstract LogReader createIndexReader();
-       
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
index e6a5c96..4daf695 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java
@@ -28,170 +28,172 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
 
-
 public class NonClusteredIndexLogReader extends IndexLogReader {
-       private final IndexDefinition indexDef;
-       private final List<byte[]> indexRowkeys;
-       private final byte[][] qualifiers;
-       private final Filter filter;
-       private HTableInterface tbl;
-       private boolean isOpen = false;
-       private Result[] results;
-       private int index = -1;
-       private final List<Scan> scans;
-       private int currentScanIndex = 0;
-       private ResultScanner currentResultScanner;
-
-       // Max tag key/value. 
-       private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 
0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 
0XFF,(byte) 0XFF};
-       private static final int BATCH_MULTIGET_SIZE = 1000;
-
-       public NonClusteredIndexLogReader(IndexDefinition indexDef, 
List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) {
-               this.indexDef = indexDef;
-               this.indexRowkeys = indexRowkeys;
-               this.qualifiers = qualifiers;
-               this.filter = filter;
-               this.scans = buildScans();
-       }
-       
-
-       private List<Scan> buildScans() {
-               final ArrayList<Scan> result = new 
ArrayList<Scan>(indexRowkeys.size());
-               for (byte[] rowkey : indexRowkeys) {
-                       Scan s = new Scan();
-                       s.setStartRow(rowkey);
-                       // In rowkey the tag key/value is sorted by the hash 
code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key
-                       final byte[] stopRowkey = ByteUtil.concat(rowkey, 
MAX_TAG_VALUE_BYTES);
-                       s.setStopRow(stopRowkey);
-                       // TODO the # of cached rows should be minimum of 
(pagesize and 100)
-                       int cs = 
EagleConfigFactory.load().getHBaseClientScanCacheSize();
-                       s.setCaching(cs);
-                       // TODO not optimized for all applications
-                       s.setCacheBlocks(true);
-                       // scan specified columnfamily for all qualifiers
-                       
s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes());
-                       result.add(s);
-               }
-               return result;
-       }
-
-       @Override
-       public void open() throws IOException {
-               if (isOpen)
-                       return; // silently return
-               try {
-                       tbl = 
EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
-               } catch (RuntimeException ex) {
-                       throw new IOException(ex);
-               }
-               currentScanIndex = 0;
-               openNewScan();
-               fillResults();
-       }
-
-       private boolean openNewScan() throws IOException {
-               closeCurrentScanResult();
-               if (currentScanIndex >= scans.size()) {
-                       return false;
-               }
-               final Scan scan = scans.get(currentScanIndex++);
-               currentResultScanner = tbl.getScanner(scan);
-               return true;
-       }
-
-       private void fillResults() throws IOException {
-               if (currentResultScanner == null) {
-                       return;
-               }
-               index = 0;
-               int count = 0;
-               Result r = null;
+    private final IndexDefinition indexDef;
+    private final List<byte[]> indexRowkeys;
+    private final byte[][] qualifiers;
+    private final Filter filter;
+    private HTableInterface tbl;
+    private boolean isOpen = false;
+    private Result[] results;
+    private int index = -1;
+    private final List<Scan> scans;
+    private int currentScanIndex = 0;
+    private ResultScanner currentResultScanner;
+
+    // Max tag key/value.
+    private static final byte[] MAX_TAG_VALUE_BYTES = {
+        (byte)0XFF, (byte)0XFF, (byte)0XFF, (byte)0XFF,
+        (byte)0XFF, (byte)0XFF, (byte)0XFF, (byte)0XFF,
+        (byte)0XFF
+    };
+    private static final int BATCH_MULTIGET_SIZE = 1000;
+
+    public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> 
indexRowkeys,
+                                      byte[][] qualifiers, Filter filter) {
+        this.indexDef = indexDef;
+        this.indexRowkeys = indexRowkeys;
+        this.qualifiers = qualifiers;
+        this.filter = filter;
+        this.scans = buildScans();
+    }
+
+    private List<Scan> buildScans() {
+        final ArrayList<Scan> result = new 
ArrayList<Scan>(indexRowkeys.size());
+        for (byte[] rowkey : indexRowkeys) {
+            Scan s = new Scan();
+            s.setStartRow(rowkey);
+            // In rowkey the tag key/value is sorted by the hash code of the 
key, so MAX_TAG_VALUE_BYTES is
+            // enough as the end key
+            final byte[] stopRowkey = ByteUtil.concat(rowkey, 
MAX_TAG_VALUE_BYTES);
+            s.setStopRow(stopRowkey);
+            // TODO the # of cached rows should be minimum of (pagesize and 
100)
+            int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize();
+            s.setCaching(cs);
+            // TODO not optimized for all applications
+            s.setCacheBlocks(true);
+            // scan specified columnfamily for all qualifiers
+            
s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes());
+            result.add(s);
+        }
+        return result;
+    }
+
+    @Override
+    public void open() throws IOException {
+        if (isOpen) {
+            return; // silently return
+        }
+        try {
+            tbl = 
EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable());
+        } catch (RuntimeException ex) {
+            throw new IOException(ex);
+        }
+        currentScanIndex = 0;
+        openNewScan();
+        fillResults();
+    }
+
+    private boolean openNewScan() throws IOException {
+        closeCurrentScanResult();
+        if (currentScanIndex >= scans.size()) {
+            return false;
+        }
+        final Scan scan = scans.get(currentScanIndex++);
+        currentResultScanner = tbl.getScanner(scan);
+        return true;
+    }
+
+    private void fillResults() throws IOException {
+        if (currentResultScanner == null) {
+            return;
+        }
+        index = 0;
+        int count = 0;
+        Result r = null;
         final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE);
-               final byte[] family = 
indexDef.getEntityDefinition().getColumnFamily().getBytes();
-               while (count < BATCH_MULTIGET_SIZE) {
-                       r = currentResultScanner.next();
-                       if (r == null) {
-                               if (openNewScan()) {
-                                       continue;
-                               } else {
-                                       break;
-                               }
-                       }
-                       for (byte[] rowkey : r.getFamilyMap(family).keySet()) {
-                               if (rowkey.length == 0) {       // invalid 
rowkey
-                                       continue;
-                               }
-                               final Get get = new Get(rowkey);
+        final byte[] family = 
indexDef.getEntityDefinition().getColumnFamily().getBytes();
+        while (count < BATCH_MULTIGET_SIZE) {
+            r = currentResultScanner.next();
+            if (r == null) {
+                if (openNewScan()) {
+                    continue;
+                } else {
+                    break;
+                }
+            }
+            for (byte[] rowkey : r.getFamilyMap(family).keySet()) {
+                if (rowkey.length == 0) { // invalid rowkey
+                    continue;
+                }
+                final Get get = new Get(rowkey);
                 if (filter != null) {
-                       get.setFilter(filter);
+                    get.setFilter(filter);
                 }
-                               if(qualifiers != null) {
-                                       for (int j = 0; j < qualifiers.length; 
++j) {
-                                               // Return the specified 
qualifiers
-                                               get.addColumn(family, 
qualifiers[j]);
-                                       }
-                               }else {
-                                       get.addFamily(family);
-                               }
-                       workaroundHBASE2198(get, filter,qualifiers);
-                               gets.add(get);
-                               ++count;
-                       }
-               }
-               if (count == 0) {
-                       results = null;
-                       return;
-               }
-               results = tbl.get(gets);
-               if (results == null || results.length == 0) {
-                       fillResults();
-               }
-       }
-
-
-       private void closeCurrentScanResult() {
-               if (currentResultScanner != null) {
-                       currentResultScanner.close();
-                       currentResultScanner = null;
-               }
-       }
-
-
-       @Override
-       public void close() throws IOException {
-               if(tbl != null){
-                       new HTableFactory().releaseHTableInterface(tbl);
-               }
-               closeCurrentScanResult();
-       }
-
-       @Override
-       public InternalLog read() throws IOException {
-               if (tbl == null) {
-                       throw new IllegalArgumentException("Haven't open before 
reading");
-               }
-               
-               Result r = null;
-               InternalLog t = null;
-               while ((r = getNextResult()) != null) {
-                       if (r.getRow() == null) {
-                               continue;
-                       }
-                       t = 
HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers);
-                       break;
-               }
-               return t;
-       }
-
+                if (qualifiers != null) {
+                    for (int j = 0; j < qualifiers.length; ++j) {
+                        // Return the specified qualifiers
+                        get.addColumn(family, qualifiers[j]);
+                    }
+                } else {
+                    get.addFamily(family);
+                }
+                workaroundHBASE2198(get, filter, qualifiers);
+                gets.add(get);
+                ++count;
+            }
+        }
+        if (count == 0) {
+            results = null;
+            return;
+        }
+        results = tbl.get(gets);
+        if (results == null || results.length == 0) {
+            fillResults();
+        }
+    }
+
+    private void closeCurrentScanResult() {
+        if (currentResultScanner != null) {
+            currentResultScanner.close();
+            currentResultScanner = null;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        if (tbl != null) {
+            new HTableFactory().releaseHTableInterface(tbl);
+        }
+        closeCurrentScanResult();
+    }
+
+    @Override
+    public InternalLog read() throws IOException {
+        if (tbl == null) {
+            throw new IllegalArgumentException("Haven't open before reading");
+        }
+
+        Result r = null;
+        InternalLog t = null;
+        while ((r = getNextResult()) != null) {
+            if (r.getRow() == null) {
+                continue;
+            }
+            t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), 
r, qualifiers);
+            break;
+        }
+        return t;
+    }
+
+    private Result getNextResult() throws IOException {
+        if (results == null || results.length == 0 || index >= results.length) 
{
+            fillResults();
+        }
+        if (results == null || results.length == 0 || index >= results.length) 
{
+            return null;
+        }
+        return results[index++];
+    }
 
-       private Result getNextResult() throws IOException {
-               if (results == null || results.length == 0 || index >= 
results.length) {
-                       fillResults();
-               }
-               if (results == null || results.length == 0 || index >= 
results.length) {
-                       return null;
-               }
-               return results[index++];
-       }
-       
 }

http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
----------------------------------------------------------------------
diff --git 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
index ec5631a..8df2773 100755
--- 
a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
+++ 
b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java
@@ -27,25 +27,29 @@ import java.util.ArrayList;
 import java.util.List;
 
 public class NonClusteredIndexStreamReader extends IndexStreamReader {
-       public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition) {
-               super(indexDef, condition, new ArrayList<byte[]>());
-               final IndexType type = 
indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
-               if (!IndexType.NON_CLUSTER_INDEX.equals(type)) {
-                       throw new IllegalArgumentException("This query can't go 
through index: " + condition.getQueryExpression());
-               }
-       }
+    public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition) {
+        super(indexDef, condition, new ArrayList<byte[]>());
+        final IndexType type = 
indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys);
+        if (!IndexType.NON_CLUSTER_INDEX.equals(type)) {
+            throw new IllegalArgumentException("This query can't go through 
index: "
+                                               + 
condition.getQueryExpression());
+        }
+    }
 
-       public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition, List<byte[]> indexRowkeys) {
-               super(indexDef, condition, indexRowkeys);
-       }
+    public NonClusteredIndexStreamReader(IndexDefinition indexDef, 
SearchCondition condition,
+                                         List<byte[]> indexRowkeys) {
+        super(indexDef, condition, indexRowkeys);
+    }
 
-       @Override
-       protected LogReader createIndexReader() {
-               final EntityDefinition entityDef = 
indexDef.getEntityDefinition();
-               byte[][] outputQualifiers = null;
-               if(!condition.isOutputAll()) {
-                       outputQualifiers = 
HBaseInternalLogHelper.getOutputQualifiers(entityDef, 
condition.getOutputFields());
-               }
-               return new NonClusteredIndexLogReader(indexDef, indexRowkeys, 
outputQualifiers, condition.getFilter());
-       }
+    @Override
+    protected LogReader createIndexReader() {
+        final EntityDefinition entityDef = indexDef.getEntityDefinition();
+        byte[][] outputQualifiers = null;
+        if (!condition.isOutputAll()) {
+            outputQualifiers = 
HBaseInternalLogHelper.getOutputQualifiers(entityDef,
+                                                                          
condition.getOutputFields());
+        }
+        return new NonClusteredIndexLogReader(indexDef, indexRowkeys, 
outputQualifiers,
+                                              condition.getFilter());
+    }
 }

Reply via email to