http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java index ec99685..9762309 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseFilterPlanUtil.java @@ -20,15 +20,30 @@ package org.apache.hadoop.hive.metastore.hbase; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; import java.util.IdentityHashMap; import java.util.List; import java.util.Map; +import java.util.Set; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.hbase.filter.CompareFilter; +import org.apache.hadoop.hbase.filter.Filter; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; +import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator; import org.apache.hadoop.hive.metastore.parser.ExpressionTree; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.LeafNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeVisitor; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; @@ -147,7 +162,7 @@ class HBaseFilterPlanUtil { public static class ScanPlan extends FilterPlan { public static class ScanMarker { - final byte[] bytes; + final String value; /** * If inclusive = true, it means that the * marker includes those bytes. @@ -155,20 +170,24 @@ class HBaseFilterPlanUtil { * or ends at the next possible byte array */ final boolean isInclusive; - ScanMarker(byte [] b, boolean i){ - this.bytes = b; + final String type; + ScanMarker(String obj, boolean i, String type){ + this.value = obj; this.isInclusive = i; + this.type = type; } @Override public String toString() { - return "ScanMarker [bytes=" + Arrays.toString(bytes) + ", isInclusive=" + isInclusive + "]"; + return "ScanMarker [" + "value=" + value.toString() + ", isInclusive=" + isInclusive + + ", type=" + type + "]"; } @Override public int hashCode() { final int prime = 31; int result = 1; - result = prime * result + Arrays.hashCode(bytes); + result = prime * result + value.hashCode(); result = prime * result + (isInclusive ? 1231 : 1237); + result = prime * result + type.hashCode(); return result; } @Override @@ -180,48 +199,118 @@ class HBaseFilterPlanUtil { if (getClass() != obj.getClass()) return false; ScanMarker other = (ScanMarker) obj; - if (!Arrays.equals(bytes, other.bytes)) + if (!value.equals(other.value)) return false; if (isInclusive != other.isInclusive) return false; + if (type != other.type) + return false; return true; } } - // represent Scan start - private ScanMarker startMarker = new ScanMarker(null, false); - // represent Scan end - private ScanMarker endMarker = new ScanMarker(null, false); - - private ScanFilter filter; - - public ScanFilter getFilter() { - return filter; + public static class ScanMarkerPair { + public ScanMarkerPair(ScanMarker startMarker, ScanMarker endMarker) { + this.startMarker = startMarker; + this.endMarker = endMarker; + } + ScanMarker startMarker; + ScanMarker endMarker; + } + // represent Scan start, partition key name -> scanMarkerPair + Map<String, ScanMarkerPair> markers = new HashMap<String, ScanMarkerPair>(); + List<Operator> ops = new ArrayList<Operator>(); + + // Get the number of partition key prefixes which can be used in the scan range. + // For example, if partition key is (year, month, state) + // 1. year = 2015 and month >= 1 and month < 5 + // year + month can be used in scan range, majorParts = 2 + // 2. year = 2015 and state = 'CA' + // only year can be used in scan range, majorParts = 1 + // 3. month = 10 and state = 'CA' + // nothing can be used in scan range, majorParts = 0 + private int getMajorPartsCount(List<FieldSchema> parts) { + int majorPartsCount = 0; + while (majorPartsCount<parts.size() && markers.containsKey(parts.get(majorPartsCount).getName())) { + ScanMarkerPair pair = markers.get(parts.get(majorPartsCount).getName()); + majorPartsCount++; + if (pair.startMarker!=null && pair.endMarker!=null && pair.startMarker.value.equals(pair + .endMarker.value) && pair.startMarker.isInclusive && pair.endMarker.isInclusive) { + // is equal + continue; + } else { + break; + } + } + return majorPartsCount; } + public Filter getFilter(List<FieldSchema> parts) { + int majorPartsCount = getMajorPartsCount(parts); + Set<String> majorKeys = new HashSet<String>(); + for (int i=0;i<majorPartsCount;i++) { + majorKeys.add(parts.get(i).getName()); + } - public void setFilter(ScanFilter filter) { - this.filter = filter; - } + List<String> names = HBaseUtils.getPartitionNames(parts); + List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>(); + for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) { + if (names.contains(entry.getKey()) && !majorKeys.contains(entry.getKey())) { + PartitionKeyComparator.Mark startMark = null; + if (entry.getValue().startMarker != null) { + startMark = new PartitionKeyComparator.Mark(entry.getValue().startMarker.value, + entry.getValue().startMarker.isInclusive); + } + PartitionKeyComparator.Mark endMark = null; + if (entry.getValue().endMarker != null) { + startMark = new PartitionKeyComparator.Mark(entry.getValue().endMarker.value, + entry.getValue().endMarker.isInclusive); + } + PartitionKeyComparator.Range range = new PartitionKeyComparator.Range( + entry.getKey(), startMark, endMark); + ranges.add(range); + } + } - public ScanMarker getStartMarker() { - return startMarker; + if (ranges.isEmpty() && ops.isEmpty()) { + return null; + } else { + return new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator( + StringUtils.join(names, ","), StringUtils.join(HBaseUtils.getPartitionKeyTypes(parts), ","), + ranges, ops)); + } } - public void setStartMarker(ScanMarker startMarker) { - this.startMarker = startMarker; - } - public void setStartMarker(byte[] start, boolean isInclusive) { - setStartMarker(new ScanMarker(start, isInclusive)); + public void setStartMarker(String keyName, String keyType, String start, boolean isInclusive) { + if (markers.containsKey(keyName)) { + markers.get(keyName).startMarker = new ScanMarker(start, isInclusive, keyType); + } else { + ScanMarkerPair marker = new ScanMarkerPair(new ScanMarker(start, isInclusive, keyType), null); + markers.put(keyName, marker); + } } - public ScanMarker getEndMarker() { - return endMarker; + public ScanMarker getStartMarker(String keyName) { + if (markers.containsKey(keyName)) { + return markers.get(keyName).startMarker; + } else { + return null; + } } - public void setEndMarker(ScanMarker endMarker) { - this.endMarker = endMarker; + public void setEndMarker(String keyName, String keyType, String end, boolean isInclusive) { + if (markers.containsKey(keyName)) { + markers.get(keyName).endMarker = new ScanMarker(end, isInclusive, keyType); + } else { + ScanMarkerPair marker = new ScanMarkerPair(null, new ScanMarker(end, isInclusive, keyType)); + markers.put(keyName, marker); + } } - public void setEndMarker(byte[] end, boolean isInclusive) { - setEndMarker(new ScanMarker(end, isInclusive)); + + public ScanMarker getEndMarker(String keyName) { + if (markers.containsKey(keyName)) { + return markers.get(keyName).endMarker; + } else { + return null; + } } @Override @@ -236,28 +325,33 @@ class HBaseFilterPlanUtil { private ScanPlan and(ScanPlan other) { // create combined FilterPlan based on existing lhs and rhs plan ScanPlan newPlan = new ScanPlan(); + newPlan.markers.putAll(markers); + + for (String keyName : other.markers.keySet()) { + if (newPlan.markers.containsKey(keyName)) { + // create new scan start + ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(keyName), + other.getStartMarker(keyName), true); + if (greaterStartMarker != null) { + newPlan.setStartMarker(keyName, greaterStartMarker.type, greaterStartMarker.value, greaterStartMarker.isInclusive); + } + + // create new scan end + ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(keyName), other.getEndMarker(keyName), + false); + if (lesserEndMarker != null) { + newPlan.setEndMarker(keyName, lesserEndMarker.type, lesserEndMarker.value, lesserEndMarker.isInclusive); + } + } else { + newPlan.markers.put(keyName, other.markers.get(keyName)); + } + } - // create new scan start - ScanMarker greaterStartMarker = getComparedMarker(this.getStartMarker(), - other.getStartMarker(), true); - newPlan.setStartMarker(greaterStartMarker); - - // create new scan end - ScanMarker lesserEndMarker = getComparedMarker(this.getEndMarker(), other.getEndMarker(), - false); - newPlan.setEndMarker(lesserEndMarker); - - // create new filter plan - newPlan.setFilter(createCombinedFilter(this.getFilter(), other.getFilter())); - + newPlan.ops.addAll(ops); + newPlan.ops.addAll(other.ops); return newPlan; } - private ScanFilter createCombinedFilter(ScanFilter filter1, ScanFilter filter2) { - // TODO create combined filter - filter1 && filter2 - return null; - } - /** * @param lStartMarker * @param rStartMarker @@ -268,13 +362,23 @@ class HBaseFilterPlanUtil { static ScanMarker getComparedMarker(ScanMarker lStartMarker, ScanMarker rStartMarker, boolean getGreater) { // if one of them has null bytes, just return other - if(lStartMarker.bytes == null) { + if(lStartMarker == null) { return rStartMarker; - } else if (rStartMarker.bytes == null) { + } else if (rStartMarker == null) { return lStartMarker; } - - int compareRes = compare(lStartMarker.bytes, rStartMarker.bytes); + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(lStartMarker.type); + ObjectInspector outputOI = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType); + Converter lConverter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + Converter rConverter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + Comparable lValue = (Comparable)lConverter.convert(lStartMarker.value); + Comparable rValue = (Comparable)rConverter.convert(rStartMarker.value); + + int compareRes = lValue.compareTo(rValue); if (compareRes == 0) { // bytes are equal, now compare the isInclusive flags if (lStartMarker.isInclusive == rStartMarker.isInclusive) { @@ -287,7 +391,7 @@ class HBaseFilterPlanUtil { isInclusive = false; } // else - return new ScanMarker(lStartMarker.bytes, isInclusive); + return new ScanMarker(lStartMarker.value, isInclusive, lStartMarker.type); } if (getGreater) { return compareRes == 1 ? lStartMarker : rStartMarker; @@ -313,42 +417,74 @@ class HBaseFilterPlanUtil { /** * @return row suffix - This is appended to db + table, to generate start row for the Scan */ - public byte[] getStartRowSuffix() { - if (startMarker.isInclusive) { - return startMarker.bytes; - } else { - return HBaseUtils.getEndPrefix(startMarker.bytes); + public byte[] getStartRowSuffix(String dbName, String tableName, List<FieldSchema> parts) { + int majorPartsCount = getMajorPartsCount(parts); + List<String> majorPartTypes = new ArrayList<String>(); + List<String> components = new ArrayList<String>(); + boolean endPrefix = false; + for (int i=0;i<majorPartsCount;i++) { + majorPartTypes.add(parts.get(i).getType()); + ScanMarker marker = markers.get(parts.get(i).getName()).startMarker; + if (marker != null) { + components.add(marker.value); + if (i==majorPartsCount-1) { + endPrefix = !marker.isInclusive; + } + } else { + components.add(null); + if (i==majorPartsCount-1) { + endPrefix = false; + } + } } + byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix); + return bytes; } /** * @return row suffix - This is appended to db + table, to generate end row for the Scan */ - public byte[] getEndRowSuffix() { - if (endMarker.isInclusive) { - return HBaseUtils.getEndPrefix(endMarker.bytes); - } else { - return endMarker.bytes; + public byte[] getEndRowSuffix(String dbName, String tableName, List<FieldSchema> parts) { + int majorPartsCount = getMajorPartsCount(parts); + List<String> majorPartTypes = new ArrayList<String>(); + List<String> components = new ArrayList<String>(); + boolean endPrefix = false; + for (int i=0;i<majorPartsCount;i++) { + majorPartTypes.add(parts.get(i).getType()); + ScanMarker marker = markers.get(parts.get(i).getName()).endMarker; + if (marker != null) { + components.add(marker.value); + if (i==majorPartsCount-1) { + endPrefix = marker.isInclusive; + } + } else { + components.add(null); + if (i==majorPartsCount-1) { + endPrefix = true; + } + } + } + byte[] bytes = HBaseUtils.buildPartitionKey(dbName, tableName, majorPartTypes, components, endPrefix); + if (components.isEmpty()) { + bytes[bytes.length-1]++; } + return bytes; } @Override public String toString() { - return "ScanPlan [startMarker=" + startMarker + ", endMarker=" + endMarker + ", filter=" - + filter + "]"; + StringBuffer sb = new StringBuffer(); + sb.append("ScanPlan:\n"); + for (Map.Entry<String, ScanMarkerPair> entry : markers.entrySet()) { + sb.append("key=" + entry.getKey() + "[startMarker=" + entry.getValue().startMarker + + ", endMarker=" + entry.getValue().endMarker + "]"); + } + return sb.toString(); } } /** - * represent a plan that can be used to create a hbase filter and then set in - * Scan.setFilter() - */ - public static class ScanFilter { - // TODO: implement this - } - - /** * Visitor for ExpressionTree. * It first generates the ScanPlan for the leaf nodes. The higher level nodes are * either AND or OR operations. It then calls FilterPlan.and and FilterPlan.or with @@ -369,9 +505,12 @@ class HBaseFilterPlanUtil { // temporary params for current left and right side plans, for AND, OR private FilterPlan rPlan; - private final String firstPartcolumn; - public PartitionFilterGenerator(String firstPartitionColumn) { - this.firstPartcolumn = firstPartitionColumn; + private Map<String, String> nameToType = new HashMap<String, String>(); + + public PartitionFilterGenerator(List<FieldSchema> parts) { + for (FieldSchema part : parts) { + nameToType.put(part.getName(), part.getType()); + } } FilterPlan getPlan() { @@ -414,63 +553,37 @@ class HBaseFilterPlanUtil { public void visit(LeafNode node) throws MetaException { ScanPlan leafPlan = new ScanPlan(); curPlan = leafPlan; - if (!isFirstParitionColumn(node.keyName)) { - leafPlan.setFilter(generateScanFilter(node)); - return; - } - if (!(node.value instanceof String)) { - // only string type is supported currently - // treat conditions on other types as true - return; - } // this is a condition on first partition column, so might influence the // start and end of the scan final boolean INCLUSIVE = true; switch (node.operator) { case EQUALS: - leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); - leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case GREATERTHAN: - leafPlan.setStartMarker(toBytes(node.value), !INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE); break; case GREATERTHANOREQUALTO: - leafPlan.setStartMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setStartMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case LESSTHAN: - leafPlan.setEndMarker(toBytes(node.value), !INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), !INCLUSIVE); break; case LESSTHANOREQUALTO: - leafPlan.setEndMarker(toBytes(node.value), INCLUSIVE); + leafPlan.setEndMarker(node.keyName, nameToType.get(node.keyName), node.value.toString(), INCLUSIVE); break; case LIKE: + leafPlan.ops.add(new Operator(Operator.Type.LIKE, node.keyName, node.value.toString())); + break; case NOTEQUALS: case NOTEQUALS2: - // TODO: create filter plan for these - hasUnsupportedCondition = true; + leafPlan.ops.add(new Operator(Operator.Type.NOTEQUALS, node.keyName, node.value.toString())); break; } } - @VisibleForTesting - static byte[] toBytes(Object value) { - // TODO: actually implement this - // We need to determine the actual type and use appropriate - // serialization format for that type - return ((String) value).getBytes(HBaseUtils.ENCODING); - } - - private ScanFilter generateScanFilter(LeafNode node) { - // TODO Auto-generated method stub - hasUnsupportedCondition = true; - return null; - } - - private boolean isFirstParitionColumn(String keyName) { - return keyName.equalsIgnoreCase(firstPartcolumn); - } - private boolean hasUnsupportedCondition() { return hasUnsupportedCondition; } @@ -486,12 +599,12 @@ class HBaseFilterPlanUtil { } } - public static PlanResult getFilterPlan(ExpressionTree exprTree, String firstPartitionColumn) throws MetaException { + public static PlanResult getFilterPlan(ExpressionTree exprTree, List<FieldSchema> parts) throws MetaException { if (exprTree == null) { // TODO: if exprTree is null, we should do what ObjectStore does. See HIVE-10102 return new PlanResult(new ScanPlan(), true); } - PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(firstPartitionColumn); + PartitionFilterGenerator pGenerator = new PartitionFilterGenerator(parts); exprTree.accept(pGenerator); return new PlanResult(pGenerator.getPlan(), pGenerator.hasUnsupportedCondition()); }
http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java index ca1582e..66c46a5 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseReadWrite.java @@ -19,8 +19,10 @@ package org.apache.hadoop.hive.metastore.hbase; import com.google.common.annotations.VisibleForTesting; + import org.apache.commons.codec.binary.Base64; import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,6 +45,7 @@ import org.apache.hadoop.hive.metastore.api.ColumnStatistics; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsDesc; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Function; import org.apache.hadoop.hive.metastore.api.NoSuchObjectException; import org.apache.hadoop.hive.metastore.api.Partition; @@ -51,6 +54,7 @@ import org.apache.hadoop.hive.metastore.api.PrincipalType; import org.apache.hadoop.hive.metastore.api.Role; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.hbase.PartitionKeyComparator.Operator; import org.apache.hive.common.util.BloomFilter; import java.io.IOException; @@ -493,12 +497,12 @@ public class HBaseReadWrite { * @return a list of partition objects. * @throws IOException */ - List<Partition> getPartitions(String dbName, String tableName, List<List<String>> partValLists) - throws IOException { + List<Partition> getPartitions(String dbName, String tableName, List<String> partTypes, + List<List<String>> partValLists) throws IOException { List<Partition> parts = new ArrayList<>(partValLists.size()); List<Get> gets = new ArrayList<>(partValLists.size()); for (List<String> partVals : partValLists) { - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partTypes, partVals); Get get = new Get(key); get.addColumn(CATALOG_CF, CATALOG_COL); gets.add(get); @@ -526,7 +530,8 @@ public class HBaseReadWrite { */ void putPartition(Partition partition) throws IOException { byte[] hash = putStorageDescriptor(partition.getSd()); - byte[][] serialized = HBaseUtils.serializePartition(partition, hash); + byte[][] serialized = HBaseUtils.serializePartition(partition, + HBaseUtils.getPartitionKeyTypes(getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()), hash); store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); partCache.put(partition.getDbName(), partition.getTableName(), partition); } @@ -547,7 +552,8 @@ public class HBaseReadWrite { decrementStorageDescriptorRefCount(oldPart.getSd()); hash = putStorageDescriptor(newPart.getSd()); } - byte[][] serialized = HBaseUtils.serializePartition(newPart, hash); + byte[][] serialized = HBaseUtils.serializePartition(newPart, + HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash); store(PART_TABLE, serialized[0], CATALOG_CF, CATALOG_COL, serialized[1]); partCache.put(newPart.getDbName(), newPart.getTableName(), newPart); if (!oldPart.getTableName().equals(newPart.getTableName())) { @@ -565,7 +571,9 @@ public class HBaseReadWrite { List<Put> puts = new ArrayList<>(partitions.size()); for (Partition partition : partitions) { byte[] hash = putStorageDescriptor(partition.getSd()); - byte[][] serialized = HBaseUtils.serializePartition(partition, hash); + List<String> partTypes = HBaseUtils.getPartitionKeyTypes( + getTable(partition.getDbName(), partition.getTableName()).getPartitionKeys()); + byte[][] serialized = HBaseUtils.serializePartition(partition, partTypes, hash); Put p = new Put(serialized[0]); p.add(CATALOG_CF, CATALOG_COL, serialized[1]); puts.add(p); @@ -591,7 +599,9 @@ public class HBaseReadWrite { decrementStorageDescriptorRefCount(oldParts.get(i).getSd()); hash = putStorageDescriptor(newParts.get(i).getSd()); } - byte[][] serialized = HBaseUtils.serializePartition(newParts.get(i), hash); + Partition newPart = newParts.get(i); + byte[][] serialized = HBaseUtils.serializePartition(newPart, + HBaseUtils.getPartitionKeyTypes(getTable(newPart.getDbName(), newPart.getTableName()).getPartitionKeys()), hash); Put p = new Put(serialized[0]); p.add(CATALOG_CF, CATALOG_COL, serialized[1]); puts.add(p); @@ -624,8 +634,9 @@ public class HBaseReadWrite { ? new ArrayList<>(cached).subList(0, maxPartitions) : new ArrayList<>(cached); } - byte[] keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(dbName, tableName); - List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); + byte[] keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, new ArrayList<String>(), + new ArrayList<String>(), false); + List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), -1, null); partCache.put(dbName, tableName, parts, true); return maxPartitions < parts.size() ? parts.subList(0, maxPartitions) : parts; } @@ -672,72 +683,68 @@ public class HBaseReadWrite { if (table == null) { throw new NoSuchObjectException("Unable to find table " + dbName + "." + tableName); } - if (partVals.size() == table.getPartitionKeys().size()) { - keyPrefix = HBaseUtils.buildKey(keyElements.toArray(new String[keyElements.size()])); - } else { - keyPrefix = HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray( - new String[keyElements.size()])); - } + keyPrefix = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys().subList(0, keyElements.size()-2)), + keyElements.subList(0, keyElements.size()-2)); // Now, build a filter out of the remaining keys - String regex = null; + List<PartitionKeyComparator.Range> ranges = new ArrayList<PartitionKeyComparator.Range>(); + List<Operator> ops = new ArrayList<Operator>(); if (!(partVals.size() == table.getPartitionKeys().size() && firstStar == -1)) { - StringBuilder buf = new StringBuilder(".*"); + for (int i = Math.max(0, firstStar); i < table.getPartitionKeys().size() && i < partVals.size(); i++) { - buf.append(HBaseUtils.KEY_SEPARATOR); + if ("*".equals(partVals.get(i))) { - buf.append("[^"); - buf.append(HBaseUtils.KEY_SEPARATOR); - buf.append("]+"); + PartitionKeyComparator.Range range = new PartitionKeyComparator.Range( + table.getPartitionKeys().get(i).getName(), + new PartitionKeyComparator.Mark(partVals.get(i), true), + new PartitionKeyComparator.Mark(partVals.get(i), true)); + ranges.add(range); } else { - buf.append(partVals.get(i)); + PartitionKeyComparator.Operator op = new PartitionKeyComparator.Operator( + PartitionKeyComparator.Operator.Type.LIKE, + table.getPartitionKeys().get(i).getName(), + ".*"); } } - if (partVals.size() < table.getPartitionKeys().size()) { - buf.append(HBaseUtils.KEY_SEPARATOR); - buf.append(".*"); - } - regex = buf.toString(); } Filter filter = null; - if (regex != null) { - filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(regex)); + if (!ranges.isEmpty() || !ops.isEmpty()) { + filter = new RowFilter(CompareFilter.CompareOp.EQUAL, new PartitionKeyComparator( + StringUtils.join(HBaseUtils.getPartitionNames(table.getPartitionKeys()), ","), + StringUtils.join(HBaseUtils.getPartitionKeyTypes(table.getPartitionKeys()), ","), + ranges, ops)); } if (LOG.isDebugEnabled()) { LOG.debug("Scanning partitions with prefix <" + new String(keyPrefix) + "> and filter <" + - regex + ">"); + filter + ">"); } - List<Partition> parts = scanPartitionsWithFilter(keyPrefix, HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter); + List<Partition> parts = scanPartitionsWithFilter(dbName, tableName, keyPrefix, + HBaseUtils.getEndPrefix(keyPrefix), maxPartitions, filter); partCache.put(dbName, tableName, parts, false); return parts; } List<Partition> scanPartitions(String dbName, String tableName, byte[] keyStart, byte[] keyEnd, Filter filter, int maxPartitions) throws IOException, NoSuchObjectException { - List<String> keyElements = new ArrayList<>(); - keyElements.add(dbName); - keyElements.add(tableName); - - byte[] keyPrefix = - HBaseUtils.buildKeyWithTrailingSeparator(keyElements.toArray(new String[keyElements.size()])); - byte[] startRow = ArrayUtils.addAll(keyPrefix, keyStart); + byte[] startRow = keyStart; byte[] endRow; if (keyEnd == null || keyEnd.length == 0) { // stop when current db+table entries are over - endRow = HBaseUtils.getEndPrefix(keyPrefix); + endRow = HBaseUtils.getEndPrefix(startRow); } else { - endRow = ArrayUtils.addAll(keyPrefix, keyEnd); + endRow = keyEnd; } if (LOG.isDebugEnabled()) { LOG.debug("Scanning partitions with start row <" + new String(startRow) + "> and end row <" + new String(endRow) + ">"); } - return scanPartitionsWithFilter(startRow, endRow, maxPartitions, filter); + return scanPartitionsWithFilter(dbName, tableName, startRow, endRow, maxPartitions, filter); } @@ -762,7 +769,8 @@ public class HBaseReadWrite { Partition p = getPartition(dbName, tableName, partVals, false); decrementStorageDescriptorRefCount(p.getSd()); } - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals); delete(PART_TABLE, key, null, null); } @@ -770,7 +778,8 @@ public class HBaseReadWrite { boolean populateCache) throws IOException { Partition cached = partCache.get(dbName, tableName, partVals); if (cached != null) return cached; - byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, partVals); + byte[] key = HBaseUtils.buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), partVals); byte[] serialized = read(PART_TABLE, key, CATALOG_CF, CATALOG_COL); if (serialized == null) return null; HBaseUtils.StorageDescriptorParts sdParts = @@ -781,17 +790,18 @@ public class HBaseReadWrite { return sdParts.containingPartition; } - private List<Partition> scanPartitionsWithFilter(byte[] startRow, byte [] endRow, - int maxResults, Filter filter) + private List<Partition> scanPartitionsWithFilter(String dbName, String tableName, + byte[] startRow, byte [] endRow, int maxResults, Filter filter) throws IOException { Iterator<Result> iter = scan(PART_TABLE, startRow, endRow, CATALOG_CF, CATALOG_COL, filter); + List<FieldSchema> tablePartitions = getTable(dbName, tableName).getPartitionKeys(); List<Partition> parts = new ArrayList<>(); int numToFetch = maxResults < 0 ? Integer.MAX_VALUE : maxResults; for (int i = 0; i < numToFetch && iter.hasNext(); i++) { Result result = iter.next(); - HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(result.getRow(), - result.getValue(CATALOG_CF, CATALOG_COL)); + HBaseUtils.StorageDescriptorParts sdParts = HBaseUtils.deserializePartition(dbName, tableName, + tablePartitions, result.getRow(), result.getValue(CATALOG_CF, CATALOG_COL)); StorageDescriptor sd = getStorageDescriptor(sdParts.sdHash); HBaseUtils.assembleStorageDescriptor(sd, sdParts); parts.add(sdParts.containingPartition); @@ -1558,7 +1568,9 @@ public class HBaseReadWrite { for (int i = 0; i < partNames.size(); i++) { valToPartMap.put(partVals.get(i), partNames.get(i)); - byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, partVals.get(i)); + byte[] partKey = HBaseUtils.buildPartitionKey(dbName, tblName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tblName).getPartitionKeys()), + partVals.get(i)); Get get = new Get(partKey); for (byte[] colName : colNameBytes) { get.addColumn(STATS_CF, colName); @@ -1690,9 +1702,11 @@ public class HBaseReadWrite { return keys; } - private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) { + private byte[] getStatisticsKey(String dbName, String tableName, List<String> partVals) throws IOException { return partVals == null ? HBaseUtils.buildKey(dbName, tableName) : HBaseUtils - .buildPartitionKey(dbName, tableName, partVals); + .buildPartitionKey(dbName, tableName, + HBaseUtils.getPartitionKeyTypes(getTable(dbName, tableName).getPartitionKeys()), + partVals); } private String getStatisticsTable(List<String> partVals) { http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java index 0204f37..717e094 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseStore.java @@ -541,7 +541,8 @@ public class HBaseStore implements RawStore { boolean commit = false; openTransaction(); try { - List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, part_vals_list); + List<Partition> oldParts = getHBase().getPartitions(db_name, tbl_name, + HBaseUtils.getPartitionKeyTypes(getTable(db_name, tbl_name).getPartitionKeys()), part_vals_list); getHBase().replacePartitions(oldParts, new_parts); for (List<String> part_vals : part_vals_list) { getHBase().getStatsCache().invalidate(db_name, tbl_name, @@ -634,10 +635,8 @@ public class HBaseStore implements RawStore { if (table == null) { throw new NoSuchObjectException("Unable to find table " + dbName + "." + tblName); } - String firstPartitionColumn = table.getPartitionKeys().get(0).getName(); // general hbase filter plan from expression tree - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, firstPartitionColumn); - + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, table.getPartitionKeys()); if (LOG.isDebugEnabled()) { LOG.debug("Hbase Filter Plan generated : " + planRes.plan); } @@ -648,7 +647,9 @@ public class HBaseStore implements RawStore { for (ScanPlan splan : planRes.plan.getPlans()) { try { List<Partition> parts = getHBase().scanPartitions(dbName, tblName, - splan.getStartRowSuffix(), splan.getEndRowSuffix(), null, -1); + splan.getStartRowSuffix(dbName, tblName, table.getPartitionKeys()), + splan.getEndRowSuffix(dbName, tblName, table.getPartitionKeys()), + splan.getFilter(table.getPartitionKeys()), -1); boolean reachedMax = false; for (Partition part : parts) { mergedParts.put(part.getValues(), part); http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java index 62bb4de..b6fa591 100644 --- a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/HBaseUtils.java @@ -18,11 +18,14 @@ */ package org.apache.hadoop.hive.metastore.hbase; +import com.google.common.collect.Lists; import com.google.protobuf.ByteString; import com.google.protobuf.InvalidProtocolBufferException; + import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.metastore.api.AggrStats; import org.apache.hadoop.hive.metastore.api.BinaryColumnStatsData; import org.apache.hadoop.hive.metastore.api.BooleanColumnStatsData; @@ -50,6 +53,19 @@ import org.apache.hadoop.hive.metastore.api.SkewedInfo; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.StringColumnStatsData; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDeWithEndPrefix; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; import org.apache.hive.common.util.BloomFilter; import java.io.IOException; @@ -63,6 +79,7 @@ import java.util.Deque; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Properties; import java.util.SortedMap; import java.util.SortedSet; import java.util.TreeMap; @@ -712,15 +729,31 @@ class HBaseUtils { return sd; } + static List<String> getPartitionKeyTypes(List<FieldSchema> parts) { + com.google.common.base.Function<FieldSchema, String> fieldSchemaToType = + new com.google.common.base.Function<FieldSchema, String>() { + public String apply(FieldSchema fs) { return fs.getType(); } + }; + return Lists.transform(parts, fieldSchemaToType); + } + + static List<String> getPartitionNames(List<FieldSchema> parts) { + com.google.common.base.Function<FieldSchema, String> fieldSchemaToName = + new com.google.common.base.Function<FieldSchema, String>() { + public String apply(FieldSchema fs) { return fs.getName(); } + }; + return Lists.transform(parts, fieldSchemaToName); + } + /** * Serialize a partition * @param part partition object * @param sdHash hash that is being used as a key for the enclosed storage descriptor * @return First element is the key, second is the serialized partition */ - static byte[][] serializePartition(Partition part, byte[] sdHash) { + static byte[][] serializePartition(Partition part, List<String> partTypes, byte[] sdHash) { byte[][] result = new byte[2][]; - result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), part.getValues()); + result[0] = buildPartitionKey(part.getDbName(), part.getTableName(), partTypes, part.getValues()); HbaseMetastoreProto.Partition.Builder builder = HbaseMetastoreProto.Partition.newBuilder(); builder .setCreateTime(part.getCreateTime()) @@ -735,11 +768,54 @@ class HBaseUtils { return result; } - static byte[] buildPartitionKey(String dbName, String tableName, List<String> partVals) { - Deque<String> keyParts = new ArrayDeque<>(partVals); - keyParts.addFirst(tableName); - keyParts.addFirst(dbName); - return buildKey(keyParts.toArray(new String[keyParts.size()])); + static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals) { + return buildPartitionKey(dbName, tableName, partTypes, partVals, false); + } + + static byte[] buildPartitionKey(String dbName, String tableName, List<String> partTypes, List<String> partVals, boolean endPrefix) { + Object[] components = new Object[partVals.size()]; + for (int i=0;i<partVals.size();i++) { + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i)); + ObjectInspector outputOI = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType); + Converter converter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + components[i] = converter.convert(partVals.get(i)); + } + + return buildSerializedPartitionKey(dbName, tableName, partTypes, components, endPrefix); + } + + static byte[] buildSerializedPartitionKey(String dbName, String tableName, List<String> partTypes, Object[] components, boolean endPrefix) { + ObjectInspector javaStringOI = + PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(PrimitiveCategory.STRING); + Object[] data = new Object[components.length+2]; + List<ObjectInspector> fois = new ArrayList<ObjectInspector>(components.length+2); + boolean[] endPrefixes = new boolean[components.length+2]; + + data[0] = dbName; + fois.add(javaStringOI); + endPrefixes[0] = false; + data[1] = tableName; + fois.add(javaStringOI); + endPrefixes[1] = false; + + for (int i = 0; i < components.length; i++) { + data[i+2] = components[i]; + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(partTypes.get(i)); + ObjectInspector outputOI = + TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(expectedType); + fois.add(outputOI); + } + Output output = new Output(); + try { + BinarySortableSerDeWithEndPrefix.serializeStruct(output, data, fois, endPrefix); + } catch (SerDeException e) { + throw new RuntimeException("Cannot serialize partition " + StringUtils.join(components, ",")); + } + return Arrays.copyOf(output.getData(), output.getLength()); } static class StorageDescriptorParts { @@ -771,11 +847,10 @@ class HBaseUtils { * @param serialized the value fetched from HBase * @return A struct that contains the partition plus parts of the storage descriptor */ - static StorageDescriptorParts deserializePartition(byte[] key, byte[] serialized) - throws InvalidProtocolBufferException { - String[] keys = deserializeKey(key); - return deserializePartition(keys[0], keys[1], - Arrays.asList(Arrays.copyOfRange(keys, 2, keys.length)), serialized); + static StorageDescriptorParts deserializePartition(String dbName, String tableName, List<FieldSchema> partitions, + byte[] key, byte[] serialized) throws InvalidProtocolBufferException { + List keys = deserializePartitionKey(partitions, key); + return deserializePartition(dbName, tableName, keys, serialized); } /** @@ -811,6 +886,36 @@ class HBaseUtils { return k.split(KEY_SEPARATOR_STR); } + private static List<String> deserializePartitionKey(List<FieldSchema> partitions, byte[] key) { + StringBuffer names = new StringBuffer(); + names.append("dbName,tableName,"); + StringBuffer types = new StringBuffer(); + types.append("string,string,"); + for (int i=0;i<partitions.size();i++) { + names.append(partitions.get(i).getName()); + types.append(TypeInfoUtils.getTypeInfoFromTypeString(partitions.get(i).getType())); + if (i!=partitions.size()-1) { + names.append(","); + types.append(","); + } + } + BinarySortableSerDe serDe = new BinarySortableSerDe(); + Properties props = new Properties(); + props.setProperty(serdeConstants.LIST_COLUMNS, names.toString()); + props.setProperty(serdeConstants.LIST_COLUMN_TYPES, types.toString()); + try { + serDe.initialize(new Configuration(), props); + List deserializedkeys = ((List)serDe.deserialize(new BytesWritable(key))).subList(2, partitions.size()+2); + List<String> partitionKeys = new ArrayList<String>(); + for (Object deserializedKey : deserializedkeys) { + partitionKeys.add(deserializedKey.toString()); + } + return partitionKeys; + } catch (SerDeException e) { + throw new RuntimeException("Error when deserialize key", e); + } + } + /** * Serialize a table * @param table table object http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java ---------------------------------------------------------------------- diff --git a/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java new file mode 100644 index 0000000..01fe403 --- /dev/null +++ b/metastore/src/java/org/apache/hadoop/hive/metastore/hbase/PartitionKeyComparator.java @@ -0,0 +1,292 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.hadoop.hive.metastore.hbase; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.lang.ArrayUtils; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.filter.ByteArrayComparable; +import org.apache.hadoop.hive.metastore.hbase.HbaseMetastoreProto.PartitionKeyComparator.Operator.Type; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.Converter; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.BytesWritable; + +import com.google.protobuf.InvalidProtocolBufferException; + +public class PartitionKeyComparator extends ByteArrayComparable { + private static final Log LOG = LogFactory.getLog(PartitionKeyComparator.class); + static class Mark { + Mark(String value, boolean inclusive) { + this.value = value; + this.inclusive = inclusive; + } + String value; + boolean inclusive; + public String toString() { + return value + (inclusive?"_":""); + } + } + static class Range { + Range(String keyName, Mark start, Mark end) { + this.keyName = keyName; + this.start = start; + this.end = end; + } + String keyName; + Mark start; + Mark end; + public String toString() { + return "" + keyName + ":" + (start!=null?start.toString():"") + (end!=null?end.toString():""); + } + } + // Cache the information derived from ranges for performance, including + // range in native datatype + static class NativeRange { + int pos; + Comparable start; + Comparable end; + } + static class Operator { + public Operator(Type type, String keyName, String val) { + this.type = type; + this.keyName = keyName; + this.val = val; + } + enum Type { + LIKE, NOTEQUALS + }; + Type type; + String keyName; + String val; + } + static class NativeOperator { + int pos; + Comparable val; + } + String names; + String types; + List<Range> ranges; + List<NativeRange> nativeRanges; + List<Operator> ops; + List<NativeOperator> nativeOps; + Properties serdeProps; + public PartitionKeyComparator(String names, String types, List<Range> ranges, List<Operator> ops) { + super(null); + this.names = names; + this.types = types; + this.ranges = ranges; + this.ops = ops; + serdeProps = new Properties(); + serdeProps.setProperty(serdeConstants.LIST_COLUMNS, "dbName,tableName," + names); + serdeProps.setProperty(serdeConstants.LIST_COLUMN_TYPES, "string,string," + types); + + this.nativeRanges = new ArrayList<NativeRange>(this.ranges.size()); + for (int i=0;i<ranges.size();i++) { + Range range = ranges.get(i); + NativeRange nativeRange = new NativeRange();; + nativeRanges.add(i, nativeRange); + nativeRange.pos = Arrays.asList(names.split(",")).indexOf(range.keyName); + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeRange.pos]); + ObjectInspector outputOI = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType); + nativeRange.start = null; + if (range.start != null) { + Converter converter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + nativeRange.start = (Comparable)converter.convert(range.start.value); + } + nativeRange.end = null; + if (range.end != null) { + Converter converter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + nativeRange.end = (Comparable)converter.convert(range.end.value); + } + } + + this.nativeOps = new ArrayList<NativeOperator>(this.ops.size()); + for (int i=0;i<ops.size();i++) { + Operator op = ops.get(i); + NativeOperator nativeOp = new NativeOperator(); + nativeOps.add(i, nativeOp); + nativeOp.pos = ArrayUtils.indexOf(names.split(","), op.keyName); + TypeInfo expectedType = + TypeInfoUtils.getTypeInfoFromTypeString(types.split(",")[nativeOp.pos]); + ObjectInspector outputOI = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(expectedType); + Converter converter = ObjectInspectorConverters.getConverter( + PrimitiveObjectInspectorFactory.javaStringObjectInspector, outputOI); + nativeOp.val = (Comparable)converter.convert(op.val); + } + } + + public static PartitionKeyComparator parseFrom(final byte [] bytes) { + HbaseMetastoreProto.PartitionKeyComparator proto; + try { + proto = HbaseMetastoreProto.PartitionKeyComparator.parseFrom(bytes); + } catch (InvalidProtocolBufferException e) { + throw new RuntimeException(e); + } + List<Range> ranges = new ArrayList<Range>(); + for (HbaseMetastoreProto.PartitionKeyComparator.Range range : proto.getRangeList()) { + Mark start = null; + if (range.hasStart()) { + start = new Mark(range.getStart().getValue(), range.getStart().getInclusive()); + } + Mark end = null; + if (range.hasEnd()) { + end = new Mark(range.getEnd().getValue(), range.getEnd().getInclusive()); + } + ranges.add(new Range(range.getKey(), start, end)); + } + List<Operator> ops = new ArrayList<Operator>(); + for (HbaseMetastoreProto.PartitionKeyComparator.Operator op : proto.getOpList()) { + ops.add(new Operator(Operator.Type.valueOf(op.getType().name()), op.getKey(), + op.getVal())); + } + return new PartitionKeyComparator(proto.getNames(), proto.getTypes(), ranges, ops); + } + + @Override + public byte[] toByteArray() { + HbaseMetastoreProto.PartitionKeyComparator.Builder builder = + HbaseMetastoreProto.PartitionKeyComparator.newBuilder(); + builder.setNames(names); + builder.setTypes(types); + for (int i=0;i<ranges.size();i++) { + Range range = ranges.get(i); + HbaseMetastoreProto.PartitionKeyComparator.Mark startMark = null; + if (range.start != null) { + startMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder() + .setValue(range.start.value) + .setInclusive(range.start.inclusive) + .build(); + } + HbaseMetastoreProto.PartitionKeyComparator.Mark endMark = null; + if (range.end != null) { + endMark = HbaseMetastoreProto.PartitionKeyComparator.Mark.newBuilder() + .setValue(range.end.value) + .setInclusive(range.end.inclusive) + .build(); + } + + HbaseMetastoreProto.PartitionKeyComparator.Range.Builder rangeBuilder = + HbaseMetastoreProto.PartitionKeyComparator.Range.newBuilder(); + rangeBuilder.setKey(range.keyName); + if (startMark != null) { + rangeBuilder.setStart(startMark); + } + if (endMark != null) { + rangeBuilder.setEnd(endMark); + } + builder.addRange(rangeBuilder.build()); + } + for (int i=0;i<ops.size();i++) { + Operator op = ops.get(i); + builder.addOp(HbaseMetastoreProto.PartitionKeyComparator.Operator.newBuilder() + .setKey(op.keyName) + .setType(Type.valueOf(op.type.toString())) + .setVal(op.val).build()); + } + return builder.build().toByteArray(); + } + + @Override + public int compareTo(byte[] value, int offset, int length) { + byte[] bytes = Arrays.copyOfRange(value, offset, offset + length); + if (LOG.isDebugEnabled()) { + LOG.debug("Get key " + new String(bytes)); + } + BinarySortableSerDe serDe = new BinarySortableSerDe(); + List deserializedkeys = null; + try { + serDe.initialize(new Configuration(), serdeProps); + deserializedkeys = ((List)serDe.deserialize(new BytesWritable(bytes))).subList(2, 2 + names.split(",").length); + } catch (SerDeException e) { + // don't bother with failed deserialization, continue with next key + return 1; + } + for (int i=0;i<ranges.size();i++) { + Range range = ranges.get(i); + NativeRange nativeRange = nativeRanges.get(i); + + Comparable partVal = (Comparable)deserializedkeys.get(nativeRange.pos); + + if (LOG.isDebugEnabled()) { + LOG.debug("Try to match range " + partVal + ", start " + nativeRange.start + ", end " + + nativeRange.end); + } + if (range.start == null || range.start.inclusive && partVal.compareTo(nativeRange.start)>=0 || + !range.start.inclusive && partVal.compareTo(nativeRange.start)>0) { + if (range.end == null || range.end.inclusive && partVal.compareTo(nativeRange.end)<=0 || + !range.end.inclusive && partVal.compareTo(nativeRange.end)<0) { + continue; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("Fail to match range " + range.keyName + "-" + partVal + "[" + nativeRange.start + + "," + nativeRange.end + "]"); + } + return 1; + } + + for (int i=0;i<ops.size();i++) { + Operator op = ops.get(i); + NativeOperator nativeOp = nativeOps.get(i); + switch (op.type) { + case LIKE: + if (!deserializedkeys.get(nativeOp.pos).toString().matches(op.val)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos) + + ") LIKE " + nativeOp.val); + } + return 1; + } + break; + case NOTEQUALS: + if (nativeOp.val.equals(deserializedkeys.get(nativeOp.pos))) { + if (LOG.isDebugEnabled()) { + LOG.debug("Fail to match operator " + op.keyName + "(" + deserializedkeys.get(nativeOp.pos) + + ")!=" + nativeOp.val); + } + return 1; + } + break; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug("All conditions satisfied:" + deserializedkeys); + } + return 0; + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto ---------------------------------------------------------------------- diff --git a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto index cba3671..0d0ef89 100644 --- a/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto +++ b/metastore/src/protobuf/org/apache/hadoop/hive/metastore/hbase/hbase_metastore_proto.proto @@ -255,3 +255,28 @@ message Table { optional PrincipalPrivilegeSet privileges = 13; optional bool is_temporary = 14; } + +message PartitionKeyComparator { + required string names = 1; + required string types = 2; + message Mark { + required string value = 1; + required bool inclusive = 2; + } + message Range { + required string key = 1; + optional Mark start = 2; + optional Mark end = 3; + } + message Operator { + enum Type { + LIKE = 0; + NOTEQUALS = 1; + } + required Type type = 1; + required string key = 2; + required string val = 3; + } + repeated Operator op = 3; + repeated Range range = 4; +} http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java ---------------------------------------------------------------------- diff --git a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java index 5943d14..06884b3 100644 --- a/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java +++ b/metastore/src/test/org/apache/hadoop/hive/metastore/hbase/TestHBaseFilterPlanUtil.java @@ -18,12 +18,17 @@ */ package org.apache.hadoop.hive.metastore.hbase; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; +import org.apache.hadoop.hbase.filter.RowFilter; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hive.metastore.PartFilterExprUtil; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.FilterPlan; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.MultiScanPlan; -import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PartitionFilterGenerator; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.PlanResult; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan; import org.apache.hadoop.hive.metastore.hbase.HBaseFilterPlanUtil.ScanPlan.ScanMarker; @@ -35,6 +40,8 @@ import org.apache.hadoop.hive.metastore.parser.ExpressionTree.TreeNode; import org.junit.Assert; import org.junit.Test; +import com.google.common.primitives.Shorts; + public class TestHBaseFilterPlanUtil { final boolean INCLUSIVE = true; @@ -68,31 +75,28 @@ public class TestHBaseFilterPlanUtil { ScanMarker r; // equal plans - l = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); - r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); + l = new ScanMarker("1", INCLUSIVE, "int"); + r = new ScanMarker("1", INCLUSIVE, "int"); assertFirstGreater(l, r); - l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); - r = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); + l = new ScanMarker("1", !INCLUSIVE, "int"); + r = new ScanMarker("1", !INCLUSIVE, "int"); assertFirstGreater(l, r); - l = new ScanMarker(null, !INCLUSIVE); - r = new ScanMarker(null, !INCLUSIVE); - assertFirstGreater(l, r); + assertFirstGreater(null, null); // create l is greater because of inclusive flag - l = new ScanMarker(new byte[] { 1, 2 }, !INCLUSIVE); - r = new ScanMarker(null, !INCLUSIVE); + l = new ScanMarker("1", !INCLUSIVE, "int"); // the rule for null vs non-null is different // non-null is both smaller and greater than null - Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, true)); - Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, true)); - Assert.assertEquals(l, ScanPlan.getComparedMarker(l, r, false)); - Assert.assertEquals(l, ScanPlan.getComparedMarker(r, l, false)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, true)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, true)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(l, null, false)); + Assert.assertEquals(l, ScanPlan.getComparedMarker(null, l, false)); // create l that is greater because of the bytes - l = new ScanMarker(new byte[] { 1, 2, 0 }, INCLUSIVE); - r = new ScanMarker(new byte[] { 1, 2 }, INCLUSIVE); + l = new ScanMarker("2", INCLUSIVE, "int"); + r = new ScanMarker("1", INCLUSIVE, "int"); assertFirstGreater(l, r); } @@ -111,36 +115,30 @@ public class TestHBaseFilterPlanUtil { public void testScanPlanAnd() { ScanPlan l = new ScanPlan(); ScanPlan r = new ScanPlan(); - l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); - r.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); + l.setStartMarker("a", "int", "10", INCLUSIVE); + r.setStartMarker("a", "int", "10", INCLUSIVE); ScanPlan res; // both equal res = l.and(r).getPlans().get(0); - Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker()); + Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker); // add equal end markers as well, and test AND again - l.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE)); - r.setEndMarker(new ScanMarker(new byte[] { 20 }, INCLUSIVE)); + l.setEndMarker("a", "int", "20", INCLUSIVE); + r.setEndMarker("a", "int", "20", INCLUSIVE); res = l.and(r).getPlans().get(0); - Assert.assertEquals(new ScanMarker(new byte[] { 10 }, INCLUSIVE), res.getStartMarker()); - Assert.assertEquals(new ScanMarker(new byte[] { 20 }, INCLUSIVE), res.getEndMarker()); - - l.setEndMarker(new ScanMarker(null, INCLUSIVE)); - r.setStartMarker(new ScanMarker(null, !INCLUSIVE)); - // markers with non null bytes are both lesser and greator - Assert.assertEquals(l.getStartMarker(), res.getStartMarker()); - Assert.assertEquals(r.getEndMarker(), res.getEndMarker()); + Assert.assertEquals(new ScanMarker("10", INCLUSIVE, "int"), res.markers.get("a").startMarker); + Assert.assertEquals(new ScanMarker("20", INCLUSIVE, "int"), res.markers.get("a").endMarker); - l.setStartMarker(new ScanMarker(new byte[] { 10, 11 }, !INCLUSIVE)); - l.setEndMarker(new ScanMarker(new byte[] { 20, 21 }, INCLUSIVE)); + l.setStartMarker("a", "int", "10", !INCLUSIVE); + l.setEndMarker("a", "int", "20", INCLUSIVE); - r.setStartMarker(new ScanMarker(new byte[] { 10, 10 }, INCLUSIVE)); - r.setEndMarker(new ScanMarker(new byte[] { 15 }, INCLUSIVE)); + r.setStartMarker("a", "int", "10", INCLUSIVE); + r.setEndMarker("a", "int", "15", INCLUSIVE); res = l.and(r).getPlans().get(0); // start of l is greater, end of r is smaller - Assert.assertEquals(l.getStartMarker(), res.getStartMarker()); - Assert.assertEquals(r.getEndMarker(), res.getEndMarker()); + Assert.assertEquals(l.markers.get("a").startMarker, res.markers.get("a").startMarker); + Assert.assertEquals(r.markers.get("a").endMarker, res.markers.get("a").endMarker); } @@ -151,13 +149,13 @@ public class TestHBaseFilterPlanUtil { public void testScanPlanOr() { ScanPlan l = new ScanPlan(); ScanPlan r = new ScanPlan(); - l.setStartMarker(new ScanMarker(new byte[] { 10 }, INCLUSIVE)); - r.setStartMarker(new ScanMarker(new byte[] { 11 }, INCLUSIVE)); + l.setStartMarker("a", "int", "1", INCLUSIVE); + r.setStartMarker("a", "int", "11", INCLUSIVE); FilterPlan res1 = l.or(r); Assert.assertEquals(2, res1.getPlans().size()); - res1.getPlans().get(0).getStartMarker().equals(l.getStartMarker()); - res1.getPlans().get(1).getStartMarker().equals(r.getStartMarker()); + res1.getPlans().get(0).markers.get("a").startMarker.equals(l.markers.get("a").startMarker); + res1.getPlans().get(1).markers.get("a").startMarker.equals(r.markers.get("a").startMarker); FilterPlan res2 = res1.or(r); Assert.assertEquals(3, res2.getPlans().size()); @@ -223,72 +221,71 @@ public class TestHBaseFilterPlanUtil { final String KEY = "k1"; final String VAL = "v1"; - final byte[] VAL_BYTES = PartitionFilterGenerator.toBytes(VAL); + final String OTHERKEY = "k2"; LeafNode l = new LeafNode(); l.keyName = KEY; l.value = VAL; - final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false); + final ScanMarker DEFAULT_SCANMARKER = null; + List<FieldSchema> parts = new ArrayList<FieldSchema>(); + parts.add(new FieldSchema(KEY, "int", null)); + parts.add(new FieldSchema(OTHERKEY, "int", null)); l.operator = Operator.EQUALS; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), new ScanMarker(VAL_BYTES, INCLUSIVE)); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), new ScanMarker(VAL, INCLUSIVE, "int")); l.operator = Operator.GREATERTHAN; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, !INCLUSIVE, "int"), DEFAULT_SCANMARKER); l.operator = Operator.GREATERTHANOREQUALTO; - verifyPlan(l, KEY, new ScanMarker(VAL_BYTES, INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(l, parts, KEY, new ScanMarker(VAL, INCLUSIVE, "int"), DEFAULT_SCANMARKER); l.operator = Operator.LESSTHAN; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, !INCLUSIVE)); + verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, !INCLUSIVE, "int")); l.operator = Operator.LESSTHANOREQUALTO; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL_BYTES, INCLUSIVE)); - - // following leaf node plans should currently have true for 'has unsupported condition', - // because of the unsupported operator - l.operator = Operator.NOTEQUALS; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.NOTEQUALS2; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.LIKE; - verifyPlan(l, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(l, parts, KEY, DEFAULT_SCANMARKER, new ScanMarker(VAL, INCLUSIVE, "int")); // following leaf node plans should currently have true for 'has unsupported condition', // because of the condition is not on first key l.operator = Operator.EQUALS; - verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); - - l.operator = Operator.NOTEQUALS; - verifyPlan(l, "NOT_FIRST_PART", DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(l, parts, OTHERKEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, false); // if tree is null, it should return equivalent of full scan, and true // for 'has unsupported condition' - verifyPlan(null, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); + verifyPlan(null, parts, KEY, DEFAULT_SCANMARKER, DEFAULT_SCANMARKER, true); } - private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker) + private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker) throws MetaException { - verifyPlan(l, keyName, startMarker, endMarker, false); + verifyPlan(l, parts, keyName, startMarker, endMarker, false); } - private void verifyPlan(TreeNode l, String keyName, ScanMarker startMarker, ScanMarker endMarker, + private void verifyPlan(TreeNode l, List<FieldSchema> parts, String keyName, ScanMarker startMarker, ScanMarker endMarker, boolean hasUnsupportedCondition) throws MetaException { ExpressionTree e = null; if (l != null) { e = new ExpressionTree(); e.setRootForTest(l); } - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, keyName); + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); FilterPlan plan = planRes.plan; Assert.assertEquals("Has unsupported condition", hasUnsupportedCondition, planRes.hasUnsupportedCondition); Assert.assertEquals(1, plan.getPlans().size()); ScanPlan splan = plan.getPlans().get(0); - Assert.assertEquals(startMarker, splan.getStartMarker()); - Assert.assertEquals(endMarker, splan.getEndMarker()); + if (startMarker != null) { + Assert.assertEquals(startMarker, splan.markers.get(keyName).startMarker); + } else { + Assert.assertTrue(splan.markers.get(keyName)==null || + splan.markers.get(keyName).startMarker==null); + } + if (endMarker != null) { + Assert.assertEquals(endMarker, splan.markers.get(keyName).endMarker); + } else { + Assert.assertTrue(splan.markers.get(keyName)==null || + splan.markers.get(keyName).endMarker==null); + } } /** @@ -302,12 +299,13 @@ public class TestHBaseFilterPlanUtil { final String KEY = "k1"; final String VAL1 = "10"; final String VAL2 = "11"; - final byte[] VAL1_BYTES = PartitionFilterGenerator.toBytes(VAL1); - final byte[] VAL2_BYTES = PartitionFilterGenerator.toBytes(VAL2); LeafNode l = new LeafNode(); l.keyName = KEY; l.value = VAL1; - final ScanMarker DEFAULT_SCANMARKER = new ScanMarker(null, false); + final ScanMarker DEFAULT_SCANMARKER = null; + + List<FieldSchema> parts = new ArrayList<FieldSchema>(); + parts.add(new FieldSchema("k1", "int", null)); LeafNode r = new LeafNode(); r.keyName = KEY; @@ -318,19 +316,19 @@ public class TestHBaseFilterPlanUtil { // verify plan for - k1 >= '10' and k1 < '11' l.operator = Operator.GREATERTHANOREQUALTO; r.operator = Operator.LESSTHAN; - verifyPlan(tn, KEY, new ScanMarker(VAL1_BYTES, INCLUSIVE), new ScanMarker(VAL2_BYTES, - !INCLUSIVE)); + verifyPlan(tn, parts, KEY, new ScanMarker(VAL1, INCLUSIVE, "int"), new ScanMarker(VAL2, + !INCLUSIVE, "int")); // verify plan for - k1 >= '10' and k1 > '11' l.operator = Operator.GREATERTHANOREQUALTO; r.operator = Operator.GREATERTHAN; - verifyPlan(tn, KEY, new ScanMarker(VAL2_BYTES, !INCLUSIVE), DEFAULT_SCANMARKER); + verifyPlan(tn, parts, KEY, new ScanMarker(VAL2, !INCLUSIVE, "int"), DEFAULT_SCANMARKER); // verify plan for - k1 >= '10' or k1 > '11' tn = new TreeNode(l, LogicalOperator.OR, r); ExpressionTree e = new ExpressionTree(); e.setRootForTest(tn); - PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(2, planRes.plan.getPlans().size()); Assert.assertEquals(false, planRes.hasUnsupportedCondition); @@ -338,7 +336,7 @@ public class TestHBaseFilterPlanUtil { TreeNode tn2 = new TreeNode(l, LogicalOperator.AND, tn); e = new ExpressionTree(); e.setRootForTest(tn2); - planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(2, planRes.plan.getPlans().size()); Assert.assertEquals(false, planRes.hasUnsupportedCondition); @@ -351,11 +349,135 @@ public class TestHBaseFilterPlanUtil { TreeNode tn3 = new TreeNode(tn2, LogicalOperator.OR, klike); e = new ExpressionTree(); e.setRootForTest(tn3); - planRes = HBaseFilterPlanUtil.getFilterPlan(e, KEY); + planRes = HBaseFilterPlanUtil.getFilterPlan(e, parts); Assert.assertEquals(3, planRes.plan.getPlans().size()); - Assert.assertEquals(true, planRes.hasUnsupportedCondition); + Assert.assertEquals(false, planRes.hasUnsupportedCondition); + + } + @Test + public void testPartitionKeyScannerAllString() throws Exception { + List<FieldSchema> parts = new ArrayList<FieldSchema>(); + parts.add(new FieldSchema("year", "string", null)); + parts.add(new FieldSchema("month", "string", null)); + parts.add(new FieldSchema("state", "string", null)); + + // One prefix key and one minor key range + ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree; + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + ScanPlan sp = planRes.plan.getPlans().get(0); + byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + RowFilter filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key year, rowfilter contains minor key state + Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes())); + Assert.assertFalse(Bytes.contains(startRowSuffix, "CA".getBytes())); + Assert.assertFalse(Bytes.contains(endRowSuffix, "CA".getBytes())); + + PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "state"); + + // Two prefix key and one LIKE operator + exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and month > 10 " + + "and month <= 11 and state like 'C%'").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key value year/month, rowfilter contains LIKE operator + Assert.assertTrue(Bytes.contains(startRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2015".getBytes())); + Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "11".getBytes())); + + comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ops.size(), 1); + Assert.assertEquals(comparator.ops.get(0).keyName, "state"); + + // One prefix key, one minor key range and one LIKE operator + exprTree = PartFilterExprUtil.getFilterParser("year >= 2014 and month > 10 " + + "and month <= 11 and state like 'C%'").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key value year (low bound), rowfilter contains minor key state + // and LIKE operator + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + + comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "month"); + Assert.assertEquals(comparator.ops.size(), 1); + Assert.assertEquals(comparator.ops.get(0).keyName, "state"); + + // Condition contains or + exprTree = PartFilterExprUtil.getFilterParser("year = 2014 and (month > 10 " + + "or month < 3)").tree; + planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + sp = planRes.plan.getPlans().get(0); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // The first ScanPlan contains year = 2014 and month > 10 + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(startRowSuffix, "10".getBytes())); + + sp = planRes.plan.getPlans().get(1); + startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + filter = (RowFilter)sp.getFilter(parts); + + // The first ScanPlan contains year = 2014 and month < 3 + Assert.assertTrue(Bytes.contains(startRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "2014".getBytes())); + Assert.assertTrue(Bytes.contains(endRowSuffix, "3".getBytes())); } + @Test + public void testPartitionKeyScannerMixedType() throws Exception { + List<FieldSchema> parts = new ArrayList<FieldSchema>(); + parts.add(new FieldSchema("year", "int", null)); + parts.add(new FieldSchema("month", "int", null)); + parts.add(new FieldSchema("state", "string", null)); + + // One prefix key and one minor key range + ExpressionTree exprTree = PartFilterExprUtil.getFilterParser("year = 2015 and state = 'CA'").tree; + PlanResult planRes = HBaseFilterPlanUtil.getFilterPlan(exprTree, parts); + + Assert.assertEquals(planRes.plan.getPlans().size(), 1); + + ScanPlan sp = planRes.plan.getPlans().get(0); + byte[] startRowSuffix = sp.getStartRowSuffix("testdb", "testtb", parts); + byte[] endRowSuffix = sp.getEndRowSuffix("testdb", "testtb", parts); + RowFilter filter = (RowFilter)sp.getFilter(parts); + + // scan range contains the major key year, rowfilter contains minor key state + Assert.assertTrue(Bytes.contains(startRowSuffix, Shorts.toByteArray((short)2015))); + Assert.assertTrue(Bytes.contains(endRowSuffix, Shorts.toByteArray((short)2016))); + + PartitionKeyComparator comparator = (PartitionKeyComparator)filter.getComparator(); + Assert.assertEquals(comparator.ranges.size(), 1); + Assert.assertEquals(comparator.ranges.get(0).keyName, "state"); + } } http://git-wip-us.apache.org/repos/asf/hive/blob/5e16d53e/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java ---------------------------------------------------------------------- diff --git a/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java new file mode 100644 index 0000000..ec43ae3 --- /dev/null +++ b/serde/src/java/org/apache/hadoop/hive/serde2/binarysortable/BinarySortableSerDeWithEndPrefix.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.serde2.binarysortable; + +import java.util.List; + +import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.ByteStream.Output; +import org.apache.hadoop.hive.serde2.binarysortable.BinarySortableSerDe; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; + +public class BinarySortableSerDeWithEndPrefix extends BinarySortableSerDe { + public static void serializeStruct(Output byteStream, Object[] fieldData, + List<ObjectInspector> fieldOis, boolean endPrefix) throws SerDeException { + for (int i = 0; i < fieldData.length; i++) { + serialize(byteStream, fieldData[i], fieldOis.get(i), false); + } + if (endPrefix) { + if (fieldData[fieldData.length-1]!=null) { + byteStream.getData()[byteStream.getLength()-1]++; + } else { + byteStream.getData()[byteStream.getLength()-1]+=2; + } + } + } +} \ No newline at end of file