http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java index 8209445..9e736ae 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/HBaseFilterBuilder.java @@ -34,552 +34,542 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; /** - * the steps of building hbase filters - * 1. receive ORExpression from eagle-antlr - * 2. iterate all ANDExpression in ORExpression - * 2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option - * 2.2 iterate all AtomicExpression in ANDExpression - * 2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is for tag filters, the other is for column filters - * 2.2.2 put the above 2 filters to a filter list with MUST_PASS_ALL option + * the steps of building hbase filters 1. receive ORExpression from eagle-antlr 2. iterate all ANDExpression + * in ORExpression 2.1 put each ANDExpression to a new filter list with MUST_PASS_ONE option 2.2 iterate all + * AtomicExpression in ANDExpression 2.2.1 group AtomicExpression into 2 groups by looking up metadata, one is + * for tag filters, the other is for column filters 2.2.2 put the above 2 filters to a filter list with + * MUST_PASS_ALL option */ public class HBaseFilterBuilder { - private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class); - - /** - * syntax is @<fieldname> - */ -// private static final String fnRegex = "^@(.*)$"; - private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex); - private static final Charset _defaultCharset = Charset.forName("ISO-8859-1"); - - private ORExpression _orExpr; - private EntityDefinition _ed; - private boolean _filterIfMissing; - private Charset _charset = _defaultCharset; - - /** - * TODO: Verify performance impact - * - * @return - */ - public Set<String> getFilterFields() { - return _filterFields; - } - - /** - * Just add filter fields for expression filter - */ - private Set<String> _filterFields; - - public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) { - this(ed, orExpr, false); - } - - public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) { - this._ed = ed; - this._orExpr = orExpr; - this._filterIfMissing = filterIfMissing; - } - - public void setCharset(String charsetName){ - _charset = Charset.forName(charsetName); - } - - public Charset getCharset(){ - return _charset; - } - - /** - * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is not a real tag when this function return true. This happens - * when a user input an wrong field name which is neither tag or qualifier - * - * @param field - */ - private boolean isTag(String field){ - return _ed.isTag(field); - } - - /** - * check whether this field is one entity attribute or not - * @param fieldName - * @return - */ - private String parseEntityAttribute(String fieldName){ - Matcher m = _fnPattern.matcher(fieldName); - if(m.find()){ - return m.group(1); - } - return null; - } - - /** - * Return the partition values for each or expression. The size of the returned list should be equal to - * the size of FilterList that {@link #buildFilters()} returns. - * - * TODO: For now we don't support one query to query multiple partitions. In future if partition is defined, - * for the entity, internally We need to spawn multiple queries and send one query for each partition. - * - * @return Return the partition values for each or expression. Return null if the entity doesn't support - * partition - */ - public List<String[]> getPartitionValues() { - final String[] partitions = _ed.getPartitions(); - if (partitions == null || partitions.length == 0) { - return null; - } - final List<String[]> result = new ArrayList<String[]>(); - final Map<String, String> partitionKeyValueMap = new HashMap<String, String>(); - for(ANDExpression andExpr : _orExpr.getANDExprList()) { - partitionKeyValueMap.clear(); - for(AtomicExpression ae : andExpr.getAtomicExprList()) { - // TODO temporarily ignore those fields which are not for attributes - if(ae.getKeyType() == TokenType.ID) { - final String fieldName = parseEntityAttribute(ae.getKey()); - if (fieldName == null) { - LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); - continue; - } - if (_ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) { - final String value = ae.getValue(); - partitionKeyValueMap.put(fieldName, value); - } - } - } - final String[] values = new String[partitions.length]; - result.add(values); - for (int i = 0; i < partitions.length; ++i) { - final String partition = partitions[i]; - final String value = partitionKeyValueMap.get(partition); - values[i] = value; - } - } - return result; - } - - /** - * @see org.apache.eagle.query.parser.TokenType - * - * @return - */ - public FilterList buildFilters(){ - // TODO: Optimize to select between row filter or column filter for better performance - // Use row key filter priority by default - boolean rowFilterPriority = true; - - FilterList fltList = new FilterList(Operator.MUST_PASS_ONE); - for(ANDExpression andExpr : _orExpr.getANDExprList()){ - - FilterList list = new FilterList(Operator.MUST_PASS_ALL); - Map<String, List<String>> tagFilters = new HashMap<String, List<String>>(); - List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>(); -// List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>(); - - // TODO refactor not to use too much if/else - for(AtomicExpression ae : andExpr.getAtomicExprList()){ - // TODO temporarily ignore those fields which are not for attributes - - String fieldName = ae.getKey(); - if(ae.getKeyType() == TokenType.ID){ - fieldName = parseEntityAttribute(fieldName); - if(fieldName == null){ - LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); - continue; - } - } - - String value = ae.getValue(); - ComparisonOperator op = ae.getOp(); - TokenType keyType = ae.getKeyType(); - TokenType valueType = ae.getValueType(); - QualifierFilterEntity entry = new QualifierFilterEntity(fieldName,value,op,keyType,valueType); - - // TODO Exact match, need to add escape for those special characters here, including: - // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|" - - if(keyType == TokenType.ID && isTag(fieldName)){ - if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op)) - && !TokenType.NULL.equals(valueType)) - { - // Use RowFilter for equal TAG - if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>()); - tagFilters.get(fieldName).add(value); - } else if (rowFilterPriority && ComparisonOperator.IN.equals(op)) - { - // Use RowFilter here by default - if(tagFilters.get(fieldName) == null) tagFilters.put(fieldName, new ArrayList<String>()); - tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value)); - } else if (ComparisonOperator.LIKE.equals(op) - || ComparisonOperator.NOT_LIKE.equals(op) - || ComparisonOperator.CONTAINS.equals(op) - || ComparisonOperator.NOT_CONTAINS.equals(op) - || ComparisonOperator.IN.equals(op) - || ComparisonOperator.IS.equals(op) - || ComparisonOperator.IS_NOT.equals(op) - || ComparisonOperator.NOT_EQUAL.equals(op) - || ComparisonOperator.EQUAL.equals(op) - || ComparisonOperator.NOT_IN.equals(op)) - { - qualifierFilters.add(entry); - } else - { - LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + " yet, going to ignore"); - throw new IllegalArgumentException("Don't support operation: "+op+" on tag field: "+fieldName+", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains"); - } - }else{ - qualifierFilters.add(entry); - } - } - - // Build RowFilter for equal tags - list.addFilter(buildTagFilter(tagFilters)); - - // Build SingleColumnValueFilter - FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters); - if(qualifierFilterList != null && qualifierFilterList.getFilters().size()>0){ - list.addFilter(qualifierFilterList); - }else { - if(LOG.isDebugEnabled()) LOG.debug("Ignore empty qualifier filter from "+qualifierFilters.toString()); - } - fltList.addFilter(list); - } - LOG.info("Query: " + _orExpr.toString() + " => Filter: " + fltList.toString()); - return fltList; - } - - /** - * _charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same - * charset to decode the byte array stored in qualifier - * for tag filter regex, it's always ISO-8859-1 as it only comes from String's hashcode (Integer) - * Note: regex comparasion is to compare String - */ - protected Filter buildTagFilter(Map<String, List<String>> tagFilters){ - RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters)); - regexStringComparator.setCharset(_charset); - RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator); - return filter; - } - - /** - * all qualifiers' condition must be satisfied. - * - * <H1>Use RegexStringComparator for:</H1> - * IN - * LIKE - * NOT_LIKE - * - * <H1>Use SubstringComparator for:</H1> - * CONTAINS - * - * <H1>Use EntityQualifierHelper for:</H1> - * EQUALS - * NOT_EUQALS - * LESS - * LESS_OR_EQUAL - * GREATER - * GREATER_OR_EQUAL - * - * <H2> - * TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper - * </H2> - * - * @param qualifierFilters - * @return - */ - protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters){ - FilterList list = new FilterList(Operator.MUST_PASS_ALL); - // iterate all the qualifiers - for(QualifierFilterEntity entry : qualifierFilters){ - // if contains expression based filter - if(entry.getKeyType() == TokenType.EXP - || entry.getValueType() == TokenType.EXP - || entry.getKeyType() != TokenType.ID){ - if(!EagleConfigFactory.load().isCoprocessorEnabled()) { - LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + entry.toString()); - } - list.addFilter(buildExpressionBasedFilter(entry)); - continue; - } - - // else using SingleColumnValueFilter - String qualifierName = entry.getKey(); - if(!isTag(entry.getKey())){ - Qualifier qualifier = _ed.getDisplayNameMap().get(entry.getKey()); - qualifierName = qualifier.getQualifierName(); - } - - // Comparator to be used for building HBase Filter - // WritableByteArrayComparable comparator; + private static final Logger LOG = LoggerFactory.getLogger(HBaseFilterBuilder.class); + + /* + * syntax is @<fieldname> + */ + // private static final String fnRegex = "^@(.*)$"; + private static final Pattern _fnPattern = TokenConstant.ID_PATTERN;// Pattern.compile(fnRegex); + private static final Charset _defaultCharset = Charset.forName("ISO-8859-1"); + + private ORExpression orExpr; + private EntityDefinition ed; + private boolean filterIfMissing; + private Charset charset = _defaultCharset; + + /** + * TODO: Verify performance impact + * + * @return + */ + public Set<String> getFilterFields() { + return filterFields; + } + + /** + * Just add filter fields for expression filter + */ + private Set<String> filterFields; + + public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr) { + this(ed, orExpr, false); + } + + public HBaseFilterBuilder(EntityDefinition ed, ORExpression orExpr, boolean filterIfMissing) { + this.ed = ed; + this.orExpr = orExpr; + this.filterIfMissing = filterIfMissing; + } + + public void setCharset(String charsetName) { + charset = Charset.forName(charsetName); + } + + public Charset getCharset() { + return charset; + } + + /** + * Because we don't have metadata for tag, we regard non-qualifer field as tag. So one field possibly is + * not a real tag when this function return true. This happens when a user input an wrong field name which + * is neither tag or qualifier + * + * @param field + */ + private boolean isTag(String field) { + return ed.isTag(field); + } + + /** + * check whether this field is one entity attribute or not + * + * @param fieldName + * @return + */ + private String parseEntityAttribute(String fieldName) { + Matcher m = _fnPattern.matcher(fieldName); + if (m.find()) { + return m.group(1); + } + return null; + } + + /** + * Return the partition values for each or expression. The size of the returned list should be equal to + * the size of FilterList that {@link #buildFilters()} returns. TODO: For now we don't support one query + * to query multiple partitions. In future if partition is defined, for the entity, internally We need to + * spawn multiple queries and send one query for each partition. + * + * @return Return the partition values for each or expression. Return null if the entity doesn't support + * partition + */ + public List<String[]> getPartitionValues() { + final String[] partitions = ed.getPartitions(); + if (partitions == null || partitions.length == 0) { + return null; + } + final List<String[]> result = new ArrayList<String[]>(); + final Map<String, String> partitionKeyValueMap = new HashMap<String, String>(); + for (ANDExpression andExpr : orExpr.getANDExprList()) { + partitionKeyValueMap.clear(); + for (AtomicExpression ae : andExpr.getAtomicExprList()) { + // TODO temporarily ignore those fields which are not for attributes + if (ae.getKeyType() == TokenType.ID) { + final String fieldName = parseEntityAttribute(ae.getKey()); + if (fieldName == null) { + LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); + continue; + } + if (ed.isPartitionTag(fieldName) && ComparisonOperator.EQUAL.equals(ae.getOp())) { + final String value = ae.getValue(); + partitionKeyValueMap.put(fieldName, value); + } + } + } + final String[] values = new String[partitions.length]; + result.add(values); + for (int i = 0; i < partitions.length; ++i) { + final String partition = partitions[i]; + final String value = partitionKeyValueMap.get(partition); + values[i] = value; + } + } + return result; + } + + /** + * @see org.apache.eagle.query.parser.TokenType + * @return + */ + public FilterList buildFilters() { + // TODO: Optimize to select between row filter or column filter for better performance + // Use row key filter priority by default + boolean rowFilterPriority = true; + + FilterList fltList = new FilterList(Operator.MUST_PASS_ONE); + for (ANDExpression andExpr : orExpr.getANDExprList()) { + + FilterList list = new FilterList(Operator.MUST_PASS_ALL); + Map<String, List<String>> tagFilters = new HashMap<String, List<String>>(); + List<QualifierFilterEntity> qualifierFilters = new ArrayList<QualifierFilterEntity>(); + // List<QualifierFilterEntry> tagLikeQualifierFilters = new ArrayList<QualifierFilterEntry>(); + + // TODO refactor not to use too much if/else + for (AtomicExpression ae : andExpr.getAtomicExprList()) { + // TODO temporarily ignore those fields which are not for attributes + + String fieldName = ae.getKey(); + if (ae.getKeyType() == TokenType.ID) { + fieldName = parseEntityAttribute(fieldName); + if (fieldName == null) { + LOG.warn(fieldName + " field does not have format @<FieldName>, ignored"); + continue; + } + } + + String value = ae.getValue(); + ComparisonOperator op = ae.getOp(); + TokenType keyType = ae.getKeyType(); + TokenType valueType = ae.getValueType(); + QualifierFilterEntity entry = new QualifierFilterEntity(fieldName, value, op, keyType, + valueType); + + // TODO Exact match, need to add escape for those special characters here, including: + // "-", "[", "]", "/", "{", "}", "(", ")", "*", "+", "?", ".", "\\", "^", "$", "|" + + if (keyType == TokenType.ID && isTag(fieldName)) { + if ((ComparisonOperator.EQUAL.equals(op) || ComparisonOperator.IS.equals(op)) + && !TokenType.NULL.equals(valueType)) { + // Use RowFilter for equal TAG + if (tagFilters.get(fieldName) == null) { + tagFilters.put(fieldName, new ArrayList<String>()); + } + tagFilters.get(fieldName).add(value); + } else if (rowFilterPriority && ComparisonOperator.IN.equals(op)) { + // Use RowFilter here by default + if (tagFilters.get(fieldName) == null) { + tagFilters.put(fieldName, new ArrayList<String>()); + } + tagFilters.get(fieldName).addAll(EntityQualifierUtils.parseList(value)); + } else if (ComparisonOperator.LIKE.equals(op) || ComparisonOperator.NOT_LIKE.equals(op) + || ComparisonOperator.CONTAINS.equals(op) + || ComparisonOperator.NOT_CONTAINS.equals(op) + || ComparisonOperator.IN.equals(op) || ComparisonOperator.IS.equals(op) + || ComparisonOperator.IS_NOT.equals(op) + || ComparisonOperator.NOT_EQUAL.equals(op) + || ComparisonOperator.EQUAL.equals(op) + || ComparisonOperator.NOT_IN.equals(op)) { + qualifierFilters.add(entry); + } else { + LOG.warn("Don't support operation: \"" + op + "\" on tag field: " + fieldName + + " yet, going to ignore"); + throw new IllegalArgumentException("Don't support operation: " + op + + " on tag field: " + fieldName + + ", avaliable options: =, =!, =~, !=~, in, not in, contains, not contains"); + } + } else { + qualifierFilters.add(entry); + } + } + + // Build RowFilter for equal tags + list.addFilter(buildTagFilter(tagFilters)); + + // Build SingleColumnValueFilter + FilterList qualifierFilterList = buildQualifierFilter(qualifierFilters); + if (qualifierFilterList != null && qualifierFilterList.getFilters().size() > 0) { + list.addFilter(qualifierFilterList); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Ignore empty qualifier filter from " + qualifierFilters.toString()); + } + } + fltList.addFilter(list); + } + LOG.info("Query: " + orExpr.toString() + " => Filter: " + fltList.toString()); + return fltList; + } + + /** + * charset is used to decode the byte array, in hbase server, RegexStringComparator uses the same charset + * to decode the byte array stored in qualifier for tag filter regex, it's always ISO-8859-1 as it only + * comes from String's hashcode (Integer) Note: regex comparasion is to compare String + */ + protected Filter buildTagFilter(Map<String, List<String>> tagFilters) { + RegexStringComparator regexStringComparator = new RegexStringComparator(buildTagFilterRegex(tagFilters)); + regexStringComparator.setCharset(charset); + RowFilter filter = new RowFilter(CompareOp.EQUAL, regexStringComparator); + return filter; + } + + /** + * all qualifiers' condition must be satisfied. + * <H1>Use RegexStringComparator for:</H1> IN LIKE NOT_LIKE + * <H1>Use SubstringComparator for:</H1> CONTAINS + * <H1>Use EntityQualifierHelper for:</H1> EQUALS NOT_EUQALS LESS LESS_OR_EQUAL GREATER GREATER_OR_EQUAL + * <H2>TODO: Compare performance of RegexStringComparator ,SubstringComparator ,EntityQualifierHelper</H2> + * + * @param qualifierFilters + * @return + */ + protected FilterList buildQualifierFilter(List<QualifierFilterEntity> qualifierFilters) { + FilterList list = new FilterList(Operator.MUST_PASS_ALL); + // iterate all the qualifiers + for (QualifierFilterEntity entry : qualifierFilters) { + // if contains expression based filter + if (entry.getKeyType() == TokenType.EXP || entry.getValueType() == TokenType.EXP + || entry.getKeyType() != TokenType.ID) { + if (!EagleConfigFactory.load().isCoprocessorEnabled()) { + LOG.warn("Expression in filter may not support, because custom filter and coprocessor is disabled: " + + entry.toString()); + } + list.addFilter(buildExpressionBasedFilter(entry)); + continue; + } + + // else using SingleColumnValueFilter + String qualifierName = entry.getKey(); + if (!isTag(entry.getKey())) { + Qualifier qualifier = ed.getDisplayNameMap().get(entry.getKey()); + qualifierName = qualifier.getQualifierName(); + } + + // Comparator to be used for building HBase Filter + // WritableByteArrayComparable comparator; ByteArrayComparable comparable; - if(ComparisonOperator.IN.equals(entry.getOp()) - || ComparisonOperator.NOT_IN.equals(entry.getOp())){ - Filter setFilter = buildListQualifierFilter(entry); - if(setFilter!=null){ - list.addFilter(setFilter); - } - }else{ - // If [=,!=,is,is not] NULL, use NullComparator else throw exception - if(TokenType.NULL.equals(entry.getValueType())){ - if(ComparisonOperator.EQUAL.equals(entry.getOp()) - ||ComparisonOperator.NOT_EQUAL.equals(entry.getOp()) - ||ComparisonOperator.IS.equals(entry.getOp()) - ||ComparisonOperator.IS_NOT.equals(entry.getOp())) + if (ComparisonOperator.IN.equals(entry.getOp()) + || ComparisonOperator.NOT_IN.equals(entry.getOp())) { + Filter setFilter = buildListQualifierFilter(entry); + if (setFilter != null) { + list.addFilter(setFilter); + } + } else { + // If [=,!=,is,is not] NULL, use NullComparator else throw exception + if (TokenType.NULL.equals(entry.getValueType())) { + if (ComparisonOperator.EQUAL.equals(entry.getOp()) + || ComparisonOperator.NOT_EQUAL.equals(entry.getOp()) + || ComparisonOperator.IS.equals(entry.getOp()) + || ComparisonOperator.IS_NOT.equals(entry.getOp())) { comparable = new NullComparator(); - else - throw new IllegalArgumentException("Operation: "+entry.getOp()+" with NULL is not supported yet: "+entry.toString()+", avaliable options: [=, !=, is, is not] null|NULL"); - } - // If [contains, not contains],use SubstringComparator - else if (ComparisonOperator.CONTAINS.equals(entry.getOp()) - || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) { + } else { + throw new IllegalArgumentException("Operation: " + entry.getOp() + + " with NULL is not supported yet: " + + entry.toString() + + ", avaliable options: [=, !=, is, is not] null|NULL"); + } + } else if (ComparisonOperator.CONTAINS.equals(entry.getOp()) + || ComparisonOperator.NOT_CONTAINS.equals(entry.getOp())) { + // If [contains, not contains],use SubstringComparator comparable = new SubstringComparator(entry.getValue()); - } - // If [like, not like], use RegexStringComparator - else if (ComparisonOperator.LIKE.equals(entry.getOp()) - || ComparisonOperator.NOT_LIKE.equals(entry.getOp())){ - // Use RegexStringComparator for LIKE / NOT_LIKE - RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry.getValue())); - _comparator.setCharset(_charset); + } else if (ComparisonOperator.LIKE.equals(entry.getOp()) + || ComparisonOperator.NOT_LIKE.equals(entry.getOp())) { + // If [like, not like], use RegexStringComparator + // Use RegexStringComparator for LIKE / NOT_LIKE + RegexStringComparator _comparator = new RegexStringComparator(buildQualifierRegex(entry + .getValue())); + _comparator.setCharset(charset); comparable = _comparator; - } else{ - Class type = EntityQualifierUtils.getType(_ed, entry.getKey()); - // if type is null (is Tag or not found) or not defined for TypedByteArrayComparator - if(!EagleConfigFactory.load().isCoprocessorEnabled() || type == null || TypedByteArrayComparator.get(type) == null){ - comparable = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue())); - }else { - comparable = new TypedByteArrayComparator(EntityQualifierUtils.toBytes(_ed, entry.getKey(), entry.getValue()),type); - } - } - - SingleColumnValueFilter filter = - new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparable); - filter.setFilterIfMissing(_filterIfMissing); - list.addFilter(filter); - } - } - - return list; - } - - private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) { - BooleanExpressionComparator expressionComparator = new BooleanExpressionComparator(entry,_ed); - _filterFields = expressionComparator.getRequiredFields(); - RowValueFilter filter = new RowValueFilter(expressionComparator); - return filter; - } - - /** - * Currently use BinaryComparator only - * <h2>TODO: </h2> - * Possibility to tune performance by using: OR[BinaryComparator,...] instead of RegexStringComparator? - * - *<br/> <br/> - * - * ! Check op must be IN or NOTIN in caller - * - * @param entry - * @return - */ - private Filter buildListQualifierFilter(QualifierFilterEntity entry){ - List<String> valueSet = EntityQualifierUtils.parseList(entry.getValue()); - Iterator<String> it = valueSet.iterator(); - String fieldName = entry.getKey(); - String qualifierName = fieldName; - if(!_ed.isTag(entry.getKey())){ - qualifierName = _ed.getDisplayNameMap().get(entry.getKey()).getQualifierName(); - } - -// TODO: Try to use RegExp just work if possible -// Because single SingleColumnValueFilter is much faster than multi SingleColumnValueFilters in OR list. -// Class qualifierType = EntityQualifierHelper.getType(_ed,fieldName); -// if( qualifierType == null || qualifierType == String.class){ -// boolean first = true; -// StringBuilder filterRegex = new StringBuilder(); -// filterRegex.append("^("); -// while(it.hasNext()) { -// String value = it.next(); -// if(value == null) { -// logger.warn("ignore empty value in set qualifier filter: "+entry.toString()); -// continue; -// } -// if(!first) filterRegex.append("|"); -// filterRegex.append(value); -// first = false; -// } -// filterRegex.append(")$"); -// RegexStringComparator regexStringComparator = new RegexStringComparator(filterRegex.toString()); -// return new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), -// convertToHBaseCompareOp(entry.getOp()), regexStringComparator); -// }else{ - FilterList setFilterList; - if(ComparisonOperator.IN.equals(entry.getOp())){ - setFilterList = new FilterList(Operator.MUST_PASS_ONE); - }else if(ComparisonOperator.NOT_IN.equals(entry.getOp())) { - setFilterList = new FilterList(Operator.MUST_PASS_ALL); - }else{ - throw new IllegalArgumentException(String.format("Don't support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN [LIST]",entry.getOp(),entry.toString())); - } - - while(it.hasNext()) { - String value = it.next(); - BinaryComparator comparator = new BinaryComparator(EntityQualifierUtils.toBytes(_ed, fieldName, value)); - SingleColumnValueFilter filter = - new SingleColumnValueFilter(_ed.getColumnFamily().getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator); - filter.setFilterIfMissing(_filterIfMissing); - setFilterList.addFilter(filter); - } - - return setFilterList; -// } - } - - /** - * Just used for LIKE and NOT_LIKE - * - * @param qualifierValue - * @return - */ - protected String buildQualifierRegex(String qualifierValue){ - StringBuilder sb = new StringBuilder(); -// sb.append("(?s)"); - sb.append("^"); - sb.append(qualifierValue); - sb.append("$"); - return sb.toString(); - } - - /** - * Appends the given ID to the given buffer, followed by "\\E". - * [steal it from opentsdb, thanks opentsdb :) https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java] - */ - private static void addId(final StringBuilder buf, final byte[] id) { - buf.append("\\Q"); - boolean backslash = false; - for (final byte b : id) { - buf.append((char) (b & 0xFF)); - if (b == 'E' && backslash) { // If we saw a `\' and now we have a `E'. - // So we just terminated the quoted section because we just added \E - // to `buf'. So let's put a litteral \E now and start quoting again. - buf.append("\\\\E\\Q"); - } else { - backslash = b == '\\'; - } - } - buf.append("\\E"); - } - - @SuppressWarnings("unused") - private static void addId(final StringBuilder buf, final String id) { - buf.append("\\Q"); - int len = id.length()-1; - boolean backslash = false; - for (int i =0; i < len; i++) { - char c = id.charAt(i); - buf.append(c); - if (c == 'E' && backslash) { // If we saw a `\' and now we have a `E'. - // So we just terminated the quoted section because we just added \E - // to `buf'. So let's put a litteral \E now and start quoting again. - buf.append("\\\\E\\Q"); - } else { - backslash = c == '\\'; - } - } - buf.append("\\E"); - } - - /** - * one search tag may have multiple values which have OR relationship, and relationship between - * different search tags is AND - * the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2" - * @param tags - * @return - */ - protected String buildTagFilterRegex(Map<String, List<String>> tags){ - // TODO need consider that \E could be part of tag, refer to https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java - final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, List<Integer>>(); - final int numOfPartitionFields = (_ed.getPartitions() == null) ? 0 : _ed.getPartitions().length; - for(Map.Entry<String, List<String>> entry : tags.entrySet()){ - String tagName = entry.getKey(); - // Ignore tag if the tag is one of partition fields - if (_ed.isPartitionTag(tagName)) { - continue; - } - List<String> stringValues = entry.getValue(); - List<Integer> hashValues = new ArrayList<Integer>(stringValues.size()); - for(String value : stringValues){ - hashValues.add(value.hashCode()); - } - tagHash.put(tagName.hashCode(), hashValues); - } - - // header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp (8 bytes) - final int headerLength = 4 + numOfPartitionFields * 4 + 8; - - // <tag1:4><value1:4> ... <tagn:4><valuen:4> - StringBuilder sb = new StringBuilder(); - sb.append("(?s)"); - sb.append("^(?:.{").append(headerLength).append("})"); - sb.append("(?:.{").append(8).append("})*"); // for any number of tags - for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) { - try { - addId(sb, ByteUtil.intToBytes(entry.getKey())); - List<Integer> hashValues = entry.getValue(); - sb.append("(?:"); - boolean first = true; - for(Integer value : hashValues){ - if(!first){ - sb.append('|'); - } - addId(sb, ByteUtil.intToBytes(value)); - first = false; - } - sb.append(")"); - sb.append("(?:.{").append(8).append("})*"); // for any number of tags - } catch (Exception ex) { - LOG.error("constructing regex error", ex); - } - } - sb.append("$"); - if(LOG.isDebugEnabled()) LOG.debug("Tag filter pattern is " + sb.toString()); - return sb.toString(); - } - - /** - * Convert ComparisonOperator to native HBase CompareOp - * - * Support: - * =, =~,CONTAINS,<,<=,>,>=,!=,!=~ - * - * @param comp - * @return - */ - protected static CompareOp convertToHBaseCompareOp(ComparisonOperator comp) { - if(comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE - || comp == ComparisonOperator.CONTAINS - || comp == ComparisonOperator.IN - || comp == ComparisonOperator.IS - ) { - return CompareOp.EQUAL; - }else if(comp == ComparisonOperator.LESS) { - return CompareOp.LESS; - } else if(comp == ComparisonOperator.LESS_OR_EQUAL){ - return CompareOp.LESS_OR_EQUAL; - }else if(comp == ComparisonOperator.GREATER) { - return CompareOp.GREATER; - } else if(comp == ComparisonOperator.GREATER_OR_EQUAL){ - return CompareOp.GREATER_OR_EQUAL; - } else if(comp == ComparisonOperator.NOT_EQUAL - || comp == ComparisonOperator.NOT_LIKE - || comp == ComparisonOperator.NOT_CONTAINS - || comp == ComparisonOperator.IS_NOT - || comp == ComparisonOperator.NOT_IN) - { - return CompareOp.NOT_EQUAL; - } else { - LOG.error("{} operation is not supported now\n", comp); - throw new IllegalArgumentException("Illegal operation: "+comp+ ", avaliable options: "+ Arrays.toString(ComparisonOperator.values())); - } - } - - protected static CompareOp getHBaseCompareOp(String comp) { - return convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp)); - } + } else { + Class type = EntityQualifierUtils.getType(ed, entry.getKey()); + // if type is null (is Tag or not found) or not defined for TypedByteArrayComparator + if (!EagleConfigFactory.load().isCoprocessorEnabled() || type == null + || TypedByteArrayComparator.get(type) == null) { + comparable = new BinaryComparator(EntityQualifierUtils.toBytes(ed, entry.getKey(), + entry.getValue())); + } else { + comparable = new TypedByteArrayComparator(EntityQualifierUtils + .toBytes(ed, entry.getKey(), entry.getValue()), type); + } + } + + SingleColumnValueFilter filter = new SingleColumnValueFilter(ed.getColumnFamily() + .getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), + comparable); + filter.setFilterIfMissing(filterIfMissing); + list.addFilter(filter); + } + } + + return list; + } + + private Filter buildExpressionBasedFilter(QualifierFilterEntity entry) { + BooleanExpressionComparator expressionComparator = new BooleanExpressionComparator(entry, ed); + filterFields = expressionComparator.getRequiredFields(); + RowValueFilter filter = new RowValueFilter(expressionComparator); + return filter; + } + + /** + * Currently use BinaryComparator only + * <h2>TODO:</h2> Possibility to tune performance by using: OR[BinaryComparator,...] instead of + * RegexStringComparator? <br/> + * <br/> + * ! Check op must be IN or NOTIN in caller + * + * @param entry + * @return + */ + private Filter buildListQualifierFilter(QualifierFilterEntity entry) { + List<String> valueSet = EntityQualifierUtils.parseList(entry.getValue()); + Iterator<String> it = valueSet.iterator(); + String fieldName = entry.getKey(); + String qualifierName = fieldName; + if (!ed.isTag(entry.getKey())) { + qualifierName = ed.getDisplayNameMap().get(entry.getKey()).getQualifierName(); + } + + // TODO: Try to use RegExp just work if possible + // Because single SingleColumnValueFilter is much faster than multi SingleColumnValueFilters in OR + // list. + // Class qualifierType = EntityQualifierHelper.getType(ed,fieldName); + // if( qualifierType == null || qualifierType == String.class){ + // boolean first = true; + // StringBuilder filterRegex = new StringBuilder(); + // filterRegex.append("^("); + // while(it.hasNext()) { + // String value = it.next(); + // if(value == null) { + // logger.warn("ignore empty value in set qualifier filter: "+entry.toString()); + // continue; + // } + // if(!first) filterRegex.append("|"); + // filterRegex.append(value); + // first = false; + // } + // filterRegex.append(")$"); + // RegexStringComparator regexStringComparator = new RegexStringComparator(filterRegex.toString()); + // return new SingleColumnValueFilter(ed.getColumnFamily().getBytes(), qualifierName.getBytes(), + // convertToHBaseCompareOp(entry.getOp()), regexStringComparator); + // }else{ + FilterList setFilterList; + if (ComparisonOperator.IN.equals(entry.getOp())) { + setFilterList = new FilterList(Operator.MUST_PASS_ONE); + } else if (ComparisonOperator.NOT_IN.equals(entry.getOp())) { + setFilterList = new FilterList(Operator.MUST_PASS_ALL); + } else { + throw new IllegalArgumentException(String + .format("Don't support operation: %s on LIST type of value yet: %s, valid options: IN/NOT IN [LIST]", + entry.getOp(), entry.toString())); + } + + while (it.hasNext()) { + String value = it.next(); + BinaryComparator comparator = new BinaryComparator(EntityQualifierUtils.toBytes(ed, fieldName, + value)); + SingleColumnValueFilter filter = new SingleColumnValueFilter(ed.getColumnFamily() + .getBytes(), qualifierName.getBytes(), convertToHBaseCompareOp(entry.getOp()), comparator); + filter.setFilterIfMissing(filterIfMissing); + setFilterList.addFilter(filter); + } + + return setFilterList; + // } + } + + /** + * Just used for LIKE and NOT_LIKE + * + * @param qualifierValue + * @return + */ + protected String buildQualifierRegex(String qualifierValue) { + StringBuilder sb = new StringBuilder(); + // sb.append("(?s)"); + sb.append("^"); + sb.append(qualifierValue); + sb.append("$"); + return sb.toString(); + } + + /** + * Appends the given ID to the given buffer, followed by "\\E". [steal it from opentsdb, thanks opentsdb + * :) https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java] + */ + private static void addId(final StringBuilder buf, final byte[] id) { + buf.append("\\Q"); + boolean backslash = false; + for (final byte b : id) { + buf.append((char)(b & 0xFF)); + if (b == 'E' && backslash) { // If we saw a `\' and now we have a `E'. + // So we just terminated the quoted section because we just added \E + // to `buf'. So let's put a litteral \E now and start quoting again. + buf.append("\\\\E\\Q"); + } else { + backslash = b == '\\'; + } + } + buf.append("\\E"); + } + + @SuppressWarnings("unused") + private static void addId(final StringBuilder buf, final String id) { + buf.append("\\Q"); + int len = id.length() - 1; + boolean backslash = false; + for (int i = 0; i < len; i++) { + char c = id.charAt(i); + buf.append(c); + if (c == 'E' && backslash) { // If we saw a `\' and now we have a `E'. + // So we just terminated the quoted section because we just added \E + // to `buf'. So let's put a litteral \E now and start quoting again. + buf.append("\\\\E\\Q"); + } else { + backslash = c == '\\'; + } + } + buf.append("\\E"); + } + + /** + * one search tag may have multiple values which have OR relationship, and relationship between different + * search tags is AND the query is like "(TAG1=value11 OR TAG1=value12) AND TAG2=value2" + * + * @param tags + * @return + */ + protected String buildTagFilterRegex(Map<String, List<String>> tags) { + // TODO need consider that \E could be part of tag, refer to + // https://github.com/OpenTSDB/opentsdb/blob/master/src/core/TsdbQuery.java + final SortedMap<Integer, List<Integer>> tagHash = new TreeMap<Integer, List<Integer>>(); + final int numOfPartitionFields = (ed.getPartitions() == null) ? 0 : ed.getPartitions().length; + for (Map.Entry<String, List<String>> entry : tags.entrySet()) { + String tagName = entry.getKey(); + // Ignore tag if the tag is one of partition fields + if (ed.isPartitionTag(tagName)) { + continue; + } + List<String> stringValues = entry.getValue(); + List<Integer> hashValues = new ArrayList<Integer>(stringValues.size()); + for (String value : stringValues) { + hashValues.add(value.hashCode()); + } + tagHash.put(tagName.hashCode(), hashValues); + } + + // header = prefix(4 bytes) + partition_hashes(4*N bytes) + timestamp (8 bytes) + final int headerLength = 4 + numOfPartitionFields * 4 + 8; + + // <tag1:4><value1:4> ... <tagn:4><valuen:4> + StringBuilder sb = new StringBuilder(); + sb.append("(?s)"); + sb.append("^(?:.{").append(headerLength).append("})"); + sb.append("(?:.{").append(8).append("})*"); // for any number of tags + for (Map.Entry<Integer, List<Integer>> entry : tagHash.entrySet()) { + try { + addId(sb, ByteUtil.intToBytes(entry.getKey())); + List<Integer> hashValues = entry.getValue(); + sb.append("(?:"); + boolean first = true; + for (Integer value : hashValues) { + if (!first) { + sb.append('|'); + } + addId(sb, ByteUtil.intToBytes(value)); + first = false; + } + sb.append(")"); + sb.append("(?:.{").append(8).append("})*"); // for any number of tags + } catch (Exception ex) { + LOG.error("constructing regex error", ex); + } + } + sb.append("$"); + if (LOG.isDebugEnabled()) { + LOG.debug("Tag filter pattern is " + sb.toString()); + } + return sb.toString(); + } + + /** + * Convert ComparisonOperator to native HBase CompareOp Support: =, =~,CONTAINS,<,<=,>,>=,!=,!=~ + * + * @param comp + * @return + */ + protected static CompareOp convertToHBaseCompareOp(ComparisonOperator comp) { + if (comp == ComparisonOperator.EQUAL || comp == ComparisonOperator.LIKE + || comp == ComparisonOperator.CONTAINS || comp == ComparisonOperator.IN + || comp == ComparisonOperator.IS) { + return CompareOp.EQUAL; + } else if (comp == ComparisonOperator.LESS) { + return CompareOp.LESS; + } else if (comp == ComparisonOperator.LESS_OR_EQUAL) { + return CompareOp.LESS_OR_EQUAL; + } else if (comp == ComparisonOperator.GREATER) { + return CompareOp.GREATER; + } else if (comp == ComparisonOperator.GREATER_OR_EQUAL) { + return CompareOp.GREATER_OR_EQUAL; + } else if (comp == ComparisonOperator.NOT_EQUAL || comp == ComparisonOperator.NOT_LIKE + || comp == ComparisonOperator.NOT_CONTAINS || comp == ComparisonOperator.IS_NOT + || comp == ComparisonOperator.NOT_IN) { + return CompareOp.NOT_EQUAL; + } else { + LOG.error("{} operation is not supported now\n", comp); + throw new IllegalArgumentException("Illegal operation: " + comp + ", avaliable options: " + + Arrays.toString(ComparisonOperator.values())); + } + } + + protected static CompareOp getHBaseCompareOp(String comp) { + return convertToHBaseCompareOp(ComparisonOperator.locateOperator(comp)); + } }
http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java index 6cdc77b..340c33b 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/QualifierFilterEntity.java @@ -24,82 +24,86 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; -public class QualifierFilterEntity implements Writable{ - public String key; - public String value; - public ComparisonOperator op; - public TokenType valueType; - public TokenType keyType; - - public QualifierFilterEntity(){} - public QualifierFilterEntity(String key, String value, ComparisonOperator comp, TokenType keyType, TokenType valueType) { - super(); - this.key = key; - this.value = value; - this.op = comp; - this.keyType = keyType; - this.valueType = valueType; - } - - public String getKey() { - return key; - } - - public void setKey(String key) { - this.key = key; - } - - public String getValue() { - return value; - } - - public void setValue(String value) { - this.value = value; - } - - public ComparisonOperator getOp() { - return op; - } - - public void setOp(ComparisonOperator op) { - this.op = op; - } - - public TokenType getValueType() { - return valueType; - } - - public void setValueType(TokenType valueType) { - this.valueType = valueType; - } - - public void setKeyType(TokenType keyType){ - this.keyType = keyType; - } - public TokenType getKeyType(){ - return this.keyType; - } - - @Override - public String toString() { - return String.format("%s %s %s",this.key,this.op,this.value); - } - - @Override - public void write(DataOutput out) throws IOException { - out.writeUTF(this.key); - out.writeUTF(this.getValue()); - out.writeUTF(this.op.name()); - out.writeUTF(this.keyType.name()); - out.writeUTF(this.valueType.name()); - } - - @Override - public void readFields(DataInput in) throws IOException { - this.key = in.readUTF(); - this.value = in.readUTF(); - this.op = ComparisonOperator.valueOf(in.readUTF()); - this.keyType = TokenType.valueOf(in.readUTF()); - this.valueType = TokenType.valueOf(in.readUTF()); - } -} \ No newline at end of file +public class QualifierFilterEntity implements Writable { + public String key; + public String value; + public ComparisonOperator op; + public TokenType valueType; + public TokenType keyType; + + public QualifierFilterEntity() { + } + + public QualifierFilterEntity(String key, String value, ComparisonOperator comp, TokenType keyType, + TokenType valueType) { + super(); + this.key = key; + this.value = value; + this.op = comp; + this.keyType = keyType; + this.valueType = valueType; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public String getValue() { + return value; + } + + public void setValue(String value) { + this.value = value; + } + + public ComparisonOperator getOp() { + return op; + } + + public void setOp(ComparisonOperator op) { + this.op = op; + } + + public TokenType getValueType() { + return valueType; + } + + public void setValueType(TokenType valueType) { + this.valueType = valueType; + } + + public void setKeyType(TokenType keyType) { + this.keyType = keyType; + } + + public TokenType getKeyType() { + return this.keyType; + } + + @Override + public String toString() { + return String.format("%s %s %s", this.key, this.op, this.value); + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeUTF(this.key); + out.writeUTF(this.getValue()); + out.writeUTF(this.op.name()); + out.writeUTF(this.keyType.name()); + out.writeUTF(this.valueType.name()); + } + + @Override + public void readFields(DataInput in) throws IOException { + this.key = in.readUTF(); + this.value = in.readUTF(); + this.op = ComparisonOperator.valueOf(in.readUTF()); + this.keyType = TokenType.valueOf(in.readUTF()); + this.valueType = TokenType.valueOf(in.readUTF()); + } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java index a4b97ea..91a6939 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/RowValueFilter.java @@ -33,32 +33,34 @@ import java.io.IOException; import java.util.List; /** - * TODO: Critical performance problem!!! - * TODO: Refactor to specified multi-column filter so that avoid return all qualifier columns from region server to client side + * TODO: Critical performance problem!!! TODO: Refactor to specified multi-column filter so that avoid return + * all qualifier columns from region server to client side * * @since 2014/11/17 */ public class RowValueFilter extends FilterBase { - private final static Logger LOG = LoggerFactory.getLogger(RowValueFilter.class); + private static final Logger LOG = LoggerFactory.getLogger(RowValueFilter.class); private boolean filterOutRow = false; private WritableComparable<List<KeyValue>> comparator; // TODO: Use qualifiers to reduce network tranfer -// private List<byte[]> qualifiers; - public RowValueFilter(){} + // private List<byte[]> qualifiers; + public RowValueFilter() { + } /** * Filter out row if WritableComparable.compareTo return 0 + * * @param comparator <code>WritableComparable[List[KeyValue]]</code> */ - public RowValueFilter(WritableComparable<List<KeyValue>> comparator){ + public RowValueFilter(WritableComparable<List<KeyValue>> comparator) { this.comparator = comparator; } -// public RowValueFilter(List<byte[]> qualifiers,WritableComparable<List<KeyValue>> comparator){ -// this.qualifiers = qualifiers; -// this.comparator = comparator; -// } + // public RowValueFilter(List<byte[]> qualifiers,WritableComparable<List<KeyValue>> comparator){ + // this.qualifiers = qualifiers; + // this.comparator = comparator; + // } /** * Old interface in hbase-0.94 @@ -77,7 +79,7 @@ public class RowValueFilter extends FilterBase { * @param in * @throws IOException */ -// @Override + // @Override @Deprecated public void readFields(DataInput in) throws IOException { this.comparator = new BooleanExpressionComparator(); @@ -85,7 +87,8 @@ public class RowValueFilter extends FilterBase { } /** - * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based + * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff + * based * * @return * @throws IOException @@ -98,23 +101,24 @@ public class RowValueFilter extends FilterBase { } /** - * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff based + * TODO: Currently still use older serialization method from hbase-0.94, need to migrate into ProtoBuff + * based */ // Override static method - public static Filter parseFrom(final byte [] pbBytes) throws DeserializationException { + public static Filter parseFrom(final byte[] pbBytes) throws DeserializationException { ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(pbBytes); RowValueFilter filter = new RowValueFilter(); try { filter.readFields(byteArrayDataInput); } catch (IOException e) { - LOG.error("Got error to deserialize RowValueFilter from PB bytes",e); + LOG.error("Got error to deserialize RowValueFilter from PB bytes", e); throw new DeserializationException(e); } return filter; } @Override - public boolean hasFilterRow(){ + public boolean hasFilterRow() { return true; } @@ -124,21 +128,21 @@ public class RowValueFilter extends FilterBase { } @Override - public void reset() { - this.filterOutRow = false; + public boolean filterRow() { + return filterOutRow; } @Override - public boolean filterRow(){ - return filterOutRow; + public void reset() { + this.filterOutRow = false; } @Override public String toString() { - return super.toString()+" ( "+this.comparator.toString()+" )"; + return super.toString() + " ( " + this.comparator.toString() + " )"; } -// public List<byte[]> getQualifiers() { -// return qualifiers; -// } -} \ No newline at end of file + // public List<byte[]> getQualifiers() { + // return qualifiers; + // } +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java index ecaf8cc..74a13c1 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/filter/TypedByteArrayComparator.java @@ -32,15 +32,12 @@ import java.util.HashMap; import java.util.Map; /** - * <h1>TypedByteArrayComparator</h1> - * - * Compare byte array: <code>byte[] value</code> with class type: <code>Class type</code> - * - * <br/> + * <h1>TypedByteArrayComparator</h1> Compare byte array: <code>byte[] value</code> with class type: + * <code>Class type</code> <br/> * <br/> * Built-in support: * - * <pre> + * <pre> * Double * double * Integer @@ -51,14 +48,14 @@ import java.util.Map; * short * Boolean * boolean - * </pre> + * </pre> * - * And can be extend by defining new {@link RawComparator} and register with {@link #define(Class type, RawComparator comparator)} - * <br/> + * And can be extend by defining new {@link RawComparator} and register with + * {@link #define(Class type, RawComparator comparator)} <br/> * <br/> */ public class TypedByteArrayComparator extends ByteArrayComparable { - private final static Logger LOG = LoggerFactory.getLogger(TypedByteArrayComparator.class); + private static final Logger LOG = LoggerFactory.getLogger(TypedByteArrayComparator.class); private Class type; @@ -69,34 +66,38 @@ public class TypedByteArrayComparator extends ByteArrayComparable { * Default constructor for writable */ @SuppressWarnings("unused") - public TypedByteArrayComparator(){ + public TypedByteArrayComparator() { super(null); } - public TypedByteArrayComparator(byte[] value, Class type){ + public TypedByteArrayComparator(byte[] value, Class type) { super(value); this.type = type; this.comparator = get(this.type); - if(this.comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type); + if (this.comparator == null) { + throw new IllegalArgumentException("No comparator found for class: " + type); + } } /** * @param in hbase-0.94 interface * @throws IOException */ -// @Override + // @Override public void readFields(DataInput in) throws IOException { -// super.readFields(in); + // super.readFields(in); try { String _type = in.readUTF(); type = _primitiveTypeClassMap.get(_type); - if(type == null) { + if (type == null) { type = Class.forName(_type); } comparator = get(type); - if(comparator == null) throw new IllegalArgumentException("No comparator found for class: "+type); + if (comparator == null) { + throw new IllegalArgumentException("No comparator found for class: " + type); + } } catch (ClassNotFoundException e) { - throw new IOException(e.getMessage(),e); + throw new IOException(e.getMessage(), e); } } @@ -104,9 +105,9 @@ public class TypedByteArrayComparator extends ByteArrayComparable { * @param out hbase-0.94 interface * @throws IOException */ -// @Override + // @Override public void write(DataOutput out) throws IOException { -// super.write(out); + // super.write(out); String typeName = type.getName(); out.writeUTF(typeName); } @@ -123,7 +124,7 @@ public class TypedByteArrayComparator extends ByteArrayComparable { this.write(byteArrayDataOutput); return byteArrayDataOutput.toByteArray(); } catch (IOException e) { - LOG.error("Failed to serialize due to: "+e.getMessage(),e); + LOG.error("Failed to serialize due to: " + e.getMessage(), e); throw new RuntimeException(e); } } @@ -135,14 +136,13 @@ public class TypedByteArrayComparator extends ByteArrayComparable { * @return Comparator instance * @throws DeserializationException */ - public static TypedByteArrayComparator parseFrom(final byte [] bytes) - throws DeserializationException { + public static TypedByteArrayComparator parseFrom(final byte[] bytes) throws DeserializationException { TypedByteArrayComparator comparator = new TypedByteArrayComparator(); ByteArrayDataInput byteArrayDataInput = ByteStreams.newDataInput(bytes); try { comparator.readFields(byteArrayDataInput); } catch (IOException e) { - LOG.error("Got error to deserialize TypedByteArrayComparator from PB bytes",e); + LOG.error("Got error to deserialize TypedByteArrayComparator from PB bytes", e); throw new DeserializationException(e); } return comparator; @@ -158,33 +158,35 @@ public class TypedByteArrayComparator extends ByteArrayComparable { * <li>Try registered comparator</li> * <li>If not found, try all possible WritableComparator</li> * </ol> - * * If not found finally, throw new IllegalArgumentException("unable to get comparator for class: "+type); * * @param type value type class * @return RawComparator */ - public static RawComparator get(Class type){ + public static RawComparator get(Class type) { RawComparator comparator = null; try { comparator = _typedClassComparator.get(type); - }catch (ClassCastException ex){ + } catch (ClassCastException ex) { // ignore } try { - if (comparator == null) comparator = WritableComparator.get(type); - }catch (ClassCastException ex){ + if (comparator == null) { + comparator = WritableComparator.get(type); + } + } catch (ClassCastException ex) { // ignore } return comparator; } - private final static Map<Class,RawComparator> _typedClassComparator = new HashMap<Class, RawComparator>(); - public static void define(Class type, RawComparator comparator){ - _typedClassComparator.put(type,comparator); + private static final Map<Class, RawComparator> _typedClassComparator = new HashMap<Class, RawComparator>(); + + public static void define(Class type, RawComparator comparator) { + _typedClassComparator.put(type, comparator); } - static{ + static { define(Double.class, WritableComparator.get(DoubleWritable.class)); define(double.class, WritableComparator.get(DoubleWritable.class)); define(Integer.class, WritableComparator.get(IntWritable.class)); @@ -200,14 +202,15 @@ public class TypedByteArrayComparator extends ByteArrayComparable { /** * Because {@link Class#forName } can't find class for primitive type */ - private final static Map<String,Class> _primitiveTypeClassMap = new HashMap<String, Class>(); + private static final Map<String, Class> _primitiveTypeClassMap = new HashMap<String, Class>(); + static { - _primitiveTypeClassMap.put(int.class.getName(),int.class); - _primitiveTypeClassMap.put(double.class.getName(),double.class); - _primitiveTypeClassMap.put(long.class.getName(),long.class); - _primitiveTypeClassMap.put(short.class.getName(),short.class); - _primitiveTypeClassMap.put(boolean.class.getName(),boolean.class); - _primitiveTypeClassMap.put(char.class.getName(),char.class); - _primitiveTypeClassMap.put(byte.class.getName(),byte.class); + _primitiveTypeClassMap.put(int.class.getName(), int.class); + _primitiveTypeClassMap.put(double.class.getName(), double.class); + _primitiveTypeClassMap.put(long.class.getName(), long.class); + _primitiveTypeClassMap.put(short.class.getName(), short.class); + _primitiveTypeClassMap.put(boolean.class.getName(), boolean.class); + _primitiveTypeClassMap.put(char.class.getName(), char.class); + _primitiveTypeClassMap.put(byte.class.getName(), byte.class); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java index 418ab33..7a32077 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexLogReader.java @@ -24,22 +24,24 @@ import org.apache.hadoop.hbase.filter.SingleColumnValueFilter; public abstract class IndexLogReader implements LogReader { - // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should use SingleColumnValueExcludeFilter, - // but it's complicated in current implementation. - protected static void workaroundHBASE2198(Get get, Filter filter,byte[][] qualifiers) { - if (filter instanceof SingleColumnValueFilter) { - if(qualifiers == null) { - get.addFamily(((SingleColumnValueFilter) filter).getFamily()); - }else{ - get.addColumn(((SingleColumnValueFilter) filter).getFamily(), ((SingleColumnValueFilter) filter).getQualifier()); - } - return; - } - if (filter instanceof FilterList) { - for (Filter f : ((FilterList)filter).getFilters()) { - workaroundHBASE2198(get, f,qualifiers); - } - } - } + // TODO: Work around https://issues.apache.org/jira/browse/HBASE-2198. More graceful implementation should + // use SingleColumnValueExcludeFilter, + // but it's complicated in current implementation. + protected static void workaroundHBASE2198(Get get, Filter filter, byte[][] qualifiers) { + if (filter instanceof SingleColumnValueFilter) { + if (qualifiers == null) { + get.addFamily(((SingleColumnValueFilter)filter).getFamily()); + } else { + get.addColumn(((SingleColumnValueFilter)filter).getFamily(), + ((SingleColumnValueFilter)filter).getQualifier()); + } + return; + } + if (filter instanceof FilterList) { + for (Filter f : ((FilterList)filter).getFilters()) { + workaroundHBASE2198(get, f, qualifiers); + } + } + } } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java index 9e059f2..579755f 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/IndexStreamReader.java @@ -26,69 +26,69 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; -public abstract class IndexStreamReader extends StreamReader { - protected final IndexDefinition indexDef; - protected final SearchCondition condition; - protected final List<byte[]> indexRowkeys; - protected LogReader<InternalLog> reader; - protected long lastTimestamp = 0; - protected long firstTimestamp = 0; - - protected static final Logger LOG = LoggerFactory.getLogger(IndexStreamReader.class); +public abstract class IndexStreamReader extends StreamReader { + protected final IndexDefinition indexDef; + protected final SearchCondition condition; + protected final List<byte[]> indexRowkeys; + protected LogReader<InternalLog> reader; + protected long lastTimestamp = 0; + protected long firstTimestamp = 0; - public IndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { - this.indexDef = indexDef; - this.condition = condition; - this.indexRowkeys = indexRowkeys; - this.reader = null; - } + protected static final Logger LOG = LoggerFactory.getLogger(IndexStreamReader.class); - @Override - public long getLastTimestamp() { - return lastTimestamp; - } + public IndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { + this.indexDef = indexDef; + this.condition = condition; + this.indexRowkeys = indexRowkeys; + this.reader = null; + } - @Override - public long getFirstTimestamp() { - return this.firstTimestamp; - } + @Override + public long getLastTimestamp() { + return lastTimestamp; + } - @Override - public void readAsStream() throws Exception { - if (reader == null) { - reader = createIndexReader(); - } - final EntityDefinition entityDef = indexDef.getEntityDefinition(); - try{ - reader.open(); - InternalLog log; - int count = 0; - while ((log = reader.read()) != null) { - TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); - entity.setSerializeAlias(condition.getOutputAlias()); - entity.setSerializeVerbose(condition.isOutputVerbose()); + @Override + public long getFirstTimestamp() { + return this.firstTimestamp; + } - if (lastTimestamp == 0 || lastTimestamp < entity.getTimestamp()) { - lastTimestamp = entity.getTimestamp(); - } - if(firstTimestamp == 0 || firstTimestamp > entity.getTimestamp()){ - firstTimestamp = entity.getTimestamp(); - } - for(EntityCreationListener l : _listeners){ - l.entityCreated(entity); - } - if(++count == condition.getPageSize()) { - break; - } - } - }catch(IOException ioe){ - LOG.error("Fail reading log", ioe); - throw ioe; - }finally{ - reader.close(); - } - } + @Override + public void readAsStream() throws Exception { + if (reader == null) { + reader = createIndexReader(); + } + final EntityDefinition entityDef = indexDef.getEntityDefinition(); + try { + reader.open(); + InternalLog log; + int count = 0; + while ((log = reader.read()) != null) { + TaggedLogAPIEntity entity = HBaseInternalLogHelper.buildEntity(log, entityDef); + entity.setSerializeAlias(condition.getOutputAlias()); + entity.setSerializeVerbose(condition.isOutputVerbose()); + + if (lastTimestamp == 0 || lastTimestamp < entity.getTimestamp()) { + lastTimestamp = entity.getTimestamp(); + } + if (firstTimestamp == 0 || firstTimestamp > entity.getTimestamp()) { + firstTimestamp = entity.getTimestamp(); + } + for (EntityCreationListener l : listeners) { + l.entityCreated(entity); + } + if (++count == condition.getPageSize()) { + break; + } + } + } catch (IOException ioe) { + LOG.error("Fail reading log", ioe); + throw ioe; + } finally { + reader.close(); + } + } + + protected abstract LogReader createIndexReader(); - protected abstract LogReader createIndexReader(); - } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java index e6a5c96..4daf695 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexLogReader.java @@ -28,170 +28,172 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; - public class NonClusteredIndexLogReader extends IndexLogReader { - private final IndexDefinition indexDef; - private final List<byte[]> indexRowkeys; - private final byte[][] qualifiers; - private final Filter filter; - private HTableInterface tbl; - private boolean isOpen = false; - private Result[] results; - private int index = -1; - private final List<Scan> scans; - private int currentScanIndex = 0; - private ResultScanner currentResultScanner; - - // Max tag key/value. - private static final byte[] MAX_TAG_VALUE_BYTES = {(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF,(byte) 0XFF}; - private static final int BATCH_MULTIGET_SIZE = 1000; - - public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, byte[][] qualifiers, Filter filter) { - this.indexDef = indexDef; - this.indexRowkeys = indexRowkeys; - this.qualifiers = qualifiers; - this.filter = filter; - this.scans = buildScans(); - } - - - private List<Scan> buildScans() { - final ArrayList<Scan> result = new ArrayList<Scan>(indexRowkeys.size()); - for (byte[] rowkey : indexRowkeys) { - Scan s = new Scan(); - s.setStartRow(rowkey); - // In rowkey the tag key/value is sorted by the hash code of the key, so MAX_TAG_VALUE_BYTES is enough as the end key - final byte[] stopRowkey = ByteUtil.concat(rowkey, MAX_TAG_VALUE_BYTES); - s.setStopRow(stopRowkey); - // TODO the # of cached rows should be minimum of (pagesize and 100) - int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize(); - s.setCaching(cs); - // TODO not optimized for all applications - s.setCacheBlocks(true); - // scan specified columnfamily for all qualifiers - s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes()); - result.add(s); - } - return result; - } - - @Override - public void open() throws IOException { - if (isOpen) - return; // silently return - try { - tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable()); - } catch (RuntimeException ex) { - throw new IOException(ex); - } - currentScanIndex = 0; - openNewScan(); - fillResults(); - } - - private boolean openNewScan() throws IOException { - closeCurrentScanResult(); - if (currentScanIndex >= scans.size()) { - return false; - } - final Scan scan = scans.get(currentScanIndex++); - currentResultScanner = tbl.getScanner(scan); - return true; - } - - private void fillResults() throws IOException { - if (currentResultScanner == null) { - return; - } - index = 0; - int count = 0; - Result r = null; + private final IndexDefinition indexDef; + private final List<byte[]> indexRowkeys; + private final byte[][] qualifiers; + private final Filter filter; + private HTableInterface tbl; + private boolean isOpen = false; + private Result[] results; + private int index = -1; + private final List<Scan> scans; + private int currentScanIndex = 0; + private ResultScanner currentResultScanner; + + // Max tag key/value. + private static final byte[] MAX_TAG_VALUE_BYTES = { + (byte)0XFF, (byte)0XFF, (byte)0XFF, (byte)0XFF, + (byte)0XFF, (byte)0XFF, (byte)0XFF, (byte)0XFF, + (byte)0XFF + }; + private static final int BATCH_MULTIGET_SIZE = 1000; + + public NonClusteredIndexLogReader(IndexDefinition indexDef, List<byte[]> indexRowkeys, + byte[][] qualifiers, Filter filter) { + this.indexDef = indexDef; + this.indexRowkeys = indexRowkeys; + this.qualifiers = qualifiers; + this.filter = filter; + this.scans = buildScans(); + } + + private List<Scan> buildScans() { + final ArrayList<Scan> result = new ArrayList<Scan>(indexRowkeys.size()); + for (byte[] rowkey : indexRowkeys) { + Scan s = new Scan(); + s.setStartRow(rowkey); + // In rowkey the tag key/value is sorted by the hash code of the key, so MAX_TAG_VALUE_BYTES is + // enough as the end key + final byte[] stopRowkey = ByteUtil.concat(rowkey, MAX_TAG_VALUE_BYTES); + s.setStopRow(stopRowkey); + // TODO the # of cached rows should be minimum of (pagesize and 100) + int cs = EagleConfigFactory.load().getHBaseClientScanCacheSize(); + s.setCaching(cs); + // TODO not optimized for all applications + s.setCacheBlocks(true); + // scan specified columnfamily for all qualifiers + s.addFamily(indexDef.getEntityDefinition().getColumnFamily().getBytes()); + result.add(s); + } + return result; + } + + @Override + public void open() throws IOException { + if (isOpen) { + return; // silently return + } + try { + tbl = EagleConfigFactory.load().getHTable(indexDef.getEntityDefinition().getTable()); + } catch (RuntimeException ex) { + throw new IOException(ex); + } + currentScanIndex = 0; + openNewScan(); + fillResults(); + } + + private boolean openNewScan() throws IOException { + closeCurrentScanResult(); + if (currentScanIndex >= scans.size()) { + return false; + } + final Scan scan = scans.get(currentScanIndex++); + currentResultScanner = tbl.getScanner(scan); + return true; + } + + private void fillResults() throws IOException { + if (currentResultScanner == null) { + return; + } + index = 0; + int count = 0; + Result r = null; final List<Get> gets = new ArrayList<Get>(BATCH_MULTIGET_SIZE); - final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes(); - while (count < BATCH_MULTIGET_SIZE) { - r = currentResultScanner.next(); - if (r == null) { - if (openNewScan()) { - continue; - } else { - break; - } - } - for (byte[] rowkey : r.getFamilyMap(family).keySet()) { - if (rowkey.length == 0) { // invalid rowkey - continue; - } - final Get get = new Get(rowkey); + final byte[] family = indexDef.getEntityDefinition().getColumnFamily().getBytes(); + while (count < BATCH_MULTIGET_SIZE) { + r = currentResultScanner.next(); + if (r == null) { + if (openNewScan()) { + continue; + } else { + break; + } + } + for (byte[] rowkey : r.getFamilyMap(family).keySet()) { + if (rowkey.length == 0) { // invalid rowkey + continue; + } + final Get get = new Get(rowkey); if (filter != null) { - get.setFilter(filter); + get.setFilter(filter); } - if(qualifiers != null) { - for (int j = 0; j < qualifiers.length; ++j) { - // Return the specified qualifiers - get.addColumn(family, qualifiers[j]); - } - }else { - get.addFamily(family); - } - workaroundHBASE2198(get, filter,qualifiers); - gets.add(get); - ++count; - } - } - if (count == 0) { - results = null; - return; - } - results = tbl.get(gets); - if (results == null || results.length == 0) { - fillResults(); - } - } - - - private void closeCurrentScanResult() { - if (currentResultScanner != null) { - currentResultScanner.close(); - currentResultScanner = null; - } - } - - - @Override - public void close() throws IOException { - if(tbl != null){ - new HTableFactory().releaseHTableInterface(tbl); - } - closeCurrentScanResult(); - } - - @Override - public InternalLog read() throws IOException { - if (tbl == null) { - throw new IllegalArgumentException("Haven't open before reading"); - } - - Result r = null; - InternalLog t = null; - while ((r = getNextResult()) != null) { - if (r.getRow() == null) { - continue; - } - t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers); - break; - } - return t; - } - + if (qualifiers != null) { + for (int j = 0; j < qualifiers.length; ++j) { + // Return the specified qualifiers + get.addColumn(family, qualifiers[j]); + } + } else { + get.addFamily(family); + } + workaroundHBASE2198(get, filter, qualifiers); + gets.add(get); + ++count; + } + } + if (count == 0) { + results = null; + return; + } + results = tbl.get(gets); + if (results == null || results.length == 0) { + fillResults(); + } + } + + private void closeCurrentScanResult() { + if (currentResultScanner != null) { + currentResultScanner.close(); + currentResultScanner = null; + } + } + + @Override + public void close() throws IOException { + if (tbl != null) { + new HTableFactory().releaseHTableInterface(tbl); + } + closeCurrentScanResult(); + } + + @Override + public InternalLog read() throws IOException { + if (tbl == null) { + throw new IllegalArgumentException("Haven't open before reading"); + } + + Result r = null; + InternalLog t = null; + while ((r = getNextResult()) != null) { + if (r.getRow() == null) { + continue; + } + t = HBaseInternalLogHelper.parse(indexDef.getEntityDefinition(), r, qualifiers); + break; + } + return t; + } + + private Result getNextResult() throws IOException { + if (results == null || results.length == 0 || index >= results.length) { + fillResults(); + } + if (results == null || results.length == 0 || index >= results.length) { + return null; + } + return results[index++]; + } - private Result getNextResult() throws IOException { - if (results == null || results.length == 0 || index >= results.length) { - fillResults(); - } - if (results == null || results.length == 0 || index >= results.length) { - return null; - } - return results[index++]; - } - } http://git-wip-us.apache.org/repos/asf/eagle/blob/6e919c2e/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java ---------------------------------------------------------------------- diff --git a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java index ec5631a..8df2773 100755 --- a/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java +++ b/eagle-core/eagle-query/eagle-entity-base/src/main/java/org/apache/eagle/log/entity/index/NonClusteredIndexStreamReader.java @@ -27,25 +27,29 @@ import java.util.ArrayList; import java.util.List; public class NonClusteredIndexStreamReader extends IndexStreamReader { - public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) { - super(indexDef, condition, new ArrayList<byte[]>()); - final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys); - if (!IndexType.NON_CLUSTER_INDEX.equals(type)) { - throw new IllegalArgumentException("This query can't go through index: " + condition.getQueryExpression()); - } - } + public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition) { + super(indexDef, condition, new ArrayList<byte[]>()); + final IndexType type = indexDef.canGoThroughIndex(condition.getQueryExpression(), indexRowkeys); + if (!IndexType.NON_CLUSTER_INDEX.equals(type)) { + throw new IllegalArgumentException("This query can't go through index: " + + condition.getQueryExpression()); + } + } - public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, List<byte[]> indexRowkeys) { - super(indexDef, condition, indexRowkeys); - } + public NonClusteredIndexStreamReader(IndexDefinition indexDef, SearchCondition condition, + List<byte[]> indexRowkeys) { + super(indexDef, condition, indexRowkeys); + } - @Override - protected LogReader createIndexReader() { - final EntityDefinition entityDef = indexDef.getEntityDefinition(); - byte[][] outputQualifiers = null; - if(!condition.isOutputAll()) { - outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, condition.getOutputFields()); - } - return new NonClusteredIndexLogReader(indexDef, indexRowkeys, outputQualifiers, condition.getFilter()); - } + @Override + protected LogReader createIndexReader() { + final EntityDefinition entityDef = indexDef.getEntityDefinition(); + byte[][] outputQualifiers = null; + if (!condition.isOutputAll()) { + outputQualifiers = HBaseInternalLogHelper.getOutputQualifiers(entityDef, + condition.getOutputFields()); + } + return new NonClusteredIndexLogReader(indexDef, indexRowkeys, outputQualifiers, + condition.getFilter()); + } }
