DRILL-781: Use MapVector as the top level vector for HBase Column Families
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5d7e3d3a Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5d7e3d3a Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5d7e3d3a Branch: refs/heads/master Commit: 5d7e3d3ab548eb2b23607df46ea843a9c1532b72 Parents: e9e63c4 Author: Aditya Kishore <[email protected]> Authored: Mon May 19 15:28:09 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Mon May 19 18:06:36 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hbase/HBaseRecordReader.java | 160 ++++++------------- .../exec/store/hbase/HBaseSchemaFactory.java | 1 - .../org/apache/drill/hbase/HBaseTestsSuite.java | 1 - .../drill/hbase/TestHBaseFilterPushDown.java | 16 +- .../drill/hbase/TestHBaseProjectPushDown.java | 7 +- .../drill/exec/record/MaterializedField.java | 3 +- 6 files changed, 69 insertions(+), 119 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index ae9f833..439f97f 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.store.hbase; import java.io.IOException; import java.util.HashMap; import java.util.Iterator; +import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -40,6 +41,7 @@ import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VarBinaryVector; +import org.apache.drill.exec.vector.complex.MapVector; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.KeyValue; @@ -50,17 +52,17 @@ import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.filter.FirstKeyOnlyFilter; import com.google.common.base.Stopwatch; -import com.google.common.collect.Lists; +import com.google.common.collect.Sets; public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HBaseRecordReader.class); private static final int TARGET_RECORD_COUNT = 4000; - private List<SchemaPath> columns; + private LinkedHashSet<SchemaPath> columns; private OutputMutator outputMutator; - private Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap; + private Map<String, MapVector> familyVectorMap; private VarBinaryVector rowKeyVector; private SchemaPath rowKeySchemaPath; @@ -78,34 +80,29 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { hbaseTable = subScanSpec.getTableName(); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); boolean rowKeyOnly = true; + this.columns = Sets.newLinkedHashSet(); if (projectedColumns != null && projectedColumns.size() != 0) { - /* - * This will change once the non-scaler value vectors are available. - * Then, each column family will have a single top level value vector - * and each column will be an item vector in its corresponding TLV. - */ - this.columns = Lists.newArrayList(projectedColumns); - Iterator<SchemaPath> columnIterator = columns.iterator(); + Iterator<SchemaPath> columnIterator = projectedColumns.iterator(); while(columnIterator.hasNext()) { SchemaPath column = columnIterator.next(); - if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) { + if (column.getRootSegment().getPath().equalsIgnoreCase(ROW_KEY)) { rowKeySchemaPath = ROW_KEY_PATH; + this.columns.add(rowKeySchemaPath); continue; } rowKeyOnly = false; NameSegment root = column.getRootSegment(); - byte[] family = root.getPath().toString().getBytes(); + byte[] family = root.getPath().getBytes(); + this.columns.add(SchemaPath.getSimplePath(root.getPath())); PathSegment child = root.getChild(); if (child != null && child.isNamed()) { - byte[] qualifier = child.getNameSegment().getPath().toString().getBytes(); + byte[] qualifier = child.getNameSegment().getPath().getBytes(); hbaseScan.addColumn(family, qualifier); } else { - columnIterator.remove(); hbaseScan.addFamily(family); } } } else { - this.columns = Lists.newArrayList(); rowKeyOnly = false; rowKeySchemaPath = ROW_KEY_PATH; this.columns.add(rowKeySchemaPath); @@ -128,16 +125,16 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { @Override public void setup(OutputMutator output) throws ExecutionSetupException { this.outputMutator = output; - vvMap = new HashMap<FamilyQualifierWrapper, NullableVarBinaryVector>(); + familyVectorMap = new HashMap<String, MapVector>(); try { // Add Vectors to output in the order specified when creating reader for (SchemaPath column : columns) { if (column.equals(rowKeySchemaPath)) { MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY)); - rowKeyVector = output.addField(field, VarBinaryVector.class); - } else if (column.getRootSegment().getChild() != null) { - getOrCreateColumnVector(new FamilyQualifierWrapper(column), false); + rowKeyVector = outputMutator.addField(field, VarBinaryVector.class); + } else { + getOrCreateFamilyVector(column.getRootSegment().getPath(), false); } } logger.debug("Opening scanner for HBase table '{}', Zookeeper quorum '{}', port '{}', znode '{}'.", @@ -158,12 +155,14 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { rowKeyVector.clear(); rowKeyVector.allocateNew(); } - for (ValueVector v : vvMap.values()) { + for (ValueVector v : familyVectorMap.values()) { v.clear(); v.allocateNew(); } - for (int count = 0; count < TARGET_RECORD_COUNT; count++) { + int rowCount = 0; + done: + for (; rowCount < TARGET_RECORD_COUNT; rowCount++) { Result result = null; try { if (leftOver != null) { @@ -176,54 +175,54 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { throw new DrillRuntimeException(e); } if (result == null) { - setOutputValueCount(count); - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count); - return count; + break done; } // parse the result and populate the value vectors KeyValue[] kvs = result.raw(); byte[] bytes = result.getBytes().get(); if (rowKeyVector != null) { - if (!rowKeyVector.getMutator().setSafe(count, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) { - setOutputValueCount(count); + if (!rowKeyVector.getMutator().setSafe(rowCount, bytes, kvs[0].getRowOffset(), kvs[0].getRowLength())) { leftOver = result; - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count); - return count; + break done; } } + for (KeyValue kv : kvs) { int familyOffset = kv.getFamilyOffset(); int familyLength = kv.getFamilyLength(); + MapVector mv = getOrCreateFamilyVector(new String(bytes, familyOffset, familyLength), true); + int qualifierOffset = kv.getQualifierOffset(); int qualifierLength = kv.getQualifierLength(); + NullableVarBinaryVector v = getOrCreateColumnVector(mv, new String(bytes, qualifierOffset, qualifierLength)); + int valueOffset = kv.getValueOffset(); int valueLength = kv.getValueLength(); - NullableVarBinaryVector v = getOrCreateColumnVector( - new FamilyQualifierWrapper(bytes, familyOffset, familyLength, qualifierOffset, qualifierLength), true); - if (!v.getMutator().setSafe(count, bytes, valueOffset, valueLength)) { - setOutputValueCount(count); + if (!v.getMutator().setSafe(rowCount, bytes, valueOffset, valueLength)) { leftOver = result; - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), count); - return count; + return rowCount; } } } - setOutputValueCount(TARGET_RECORD_COUNT); - logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), TARGET_RECORD_COUNT); - return TARGET_RECORD_COUNT; + + setOutputRowCount(rowCount); + logger.debug("Took {} ms to get {} records", watch.elapsed(TimeUnit.MILLISECONDS), rowCount); + return rowCount; } - private NullableVarBinaryVector getOrCreateColumnVector(FamilyQualifierWrapper column, boolean allocateOnCreate) { + private MapVector getOrCreateFamilyVector(String familyName, boolean allocateOnCreate) { try { - NullableVarBinaryVector v = vvMap.get(column); + MapVector v = familyVectorMap.get(familyName); if(v == null) { - MaterializedField field = MaterializedField.create(column.asSchemaPath(), Types.optional(TypeProtos.MinorType.VARBINARY)); - v = outputMutator.addField(field, NullableVarBinaryVector.class); + SchemaPath column = SchemaPath.getSimplePath(familyName); + MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.MAP)); + v = outputMutator.addField(field, MapVector.class); if (allocateOnCreate) { v.allocateNew(); } - vvMap.put(column, v); + columns.add(column); + familyVectorMap.put(familyName, v); } return v; } catch (SchemaChangeException e) { @@ -231,6 +230,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } + private NullableVarBinaryVector getOrCreateColumnVector(MapVector mv, String qualifier) { + int oldSize = mv.size(); + NullableVarBinaryVector v = mv.addOrGet(qualifier, Types.optional(TypeProtos.MinorType.VARBINARY), NullableVarBinaryVector.class); + if (oldSize != mv.size()) { + v.allocateNew(); + } + return v; + } + @Override public void cleanup() { try { @@ -245,8 +253,8 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } - private void setOutputValueCount(int count) { - for (ValueVector vv : vvMap.values()) { + private void setOutputRowCount(int count) { + for (ValueVector vv : familyVectorMap.values()) { vv.getMutator().setValueCount(count); } if (rowKeyVector != null) { @@ -254,68 +262,4 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { } } - private static class FamilyQualifierWrapper implements Comparable<FamilyQualifierWrapper> { - int hashCode; - protected String stringVal; - protected String family; - protected String qualifier; - - public FamilyQualifierWrapper(SchemaPath column) { - this(column.getRootSegment().getPath(), column.getRootSegment().getChild().getNameSegment().getPath()); - } - - public FamilyQualifierWrapper(byte[] bytes, int familyOffset, int familyLength, int qualifierOffset, int qualifierLength) { - this(new String(bytes, familyOffset, familyLength), new String(bytes, qualifierOffset, qualifierLength)); - } - - public FamilyQualifierWrapper(String family, String qualifier) { - this.family = family; - this.qualifier = qualifier; - hashCode = 31*family.hashCode() + qualifier.hashCode(); - } - - @Override - public int hashCode() { - return this.hashCode; - } - - @Override - public boolean equals(Object anObject) { - if (this == anObject) { - return true; - } - if (anObject instanceof FamilyQualifierWrapper) { - FamilyQualifierWrapper that = (FamilyQualifierWrapper) anObject; - // we compare qualifier first since many columns will have same family - if (!qualifier.equals(that.qualifier)) { - return false; - } - return family.equals(that.family); - } - return false; - } - - @Override - public String toString() { - if (stringVal == null) { - stringVal = new StringBuilder().append(new String(family)).append(".").append(new String(qualifier)).toString(); - } - return stringVal; - } - - public SchemaPath asSchemaPath() { - return SchemaPath.getCompoundPath(family, qualifier); - } - - @Override - public int compareTo(FamilyQualifierWrapper o) { - int val = family.compareTo(o.family); - if (val != 0) { - return val; - } - return qualifier.compareTo(o.qualifier); - } - - } - } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java index ce3b9fd..84f363b 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseSchemaFactory.java @@ -29,7 +29,6 @@ import org.apache.drill.exec.planner.logical.DynamicDrillTable; import org.apache.drill.exec.rpc.user.UserSession; import org.apache.drill.exec.store.AbstractSchema; import org.apache.drill.exec.store.SchemaFactory; -import org.apache.drill.exec.store.dfs.FileSystemConfig; import org.apache.hadoop.hbase.HTableDescriptor; import org.apache.hadoop.hbase.client.HBaseAdmin; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java index 3881f4d..e30f79e 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseTestsSuite.java @@ -28,7 +28,6 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.HBaseAdmin; import org.junit.AfterClass; import org.junit.BeforeClass; -import org.junit.Ignore; import org.junit.runner.RunWith; import org.junit.runners.Suite; import org.junit.runners.Suite.SuiteClasses; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java index 76300b7..90404b7 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseFilterPushDown.java @@ -17,11 +17,6 @@ */ package org.apache.drill.hbase; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.Arrays; - -import org.junit.Ignore; import org.junit.Test; public class TestHBaseFilterPushDown extends BaseHBaseTest { @@ -49,6 +44,17 @@ public class TestHBaseFilterPushDown extends BaseHBaseTest { } @Test + public void testFilterPushDownRowKeyBetween() throws Exception { + runSQLVerifyCount("SELECT\n" + + " *\n" + + "FROM\n" + + " hbase.`[TABLE_NAME]` tableName\n" + + "WHERE\n" + + " row_key BETWEEN 'a2' AND 'b4'" + , 3); + } + + @Test public void testFilterPushDownMultiColumns() throws Exception { runSQLVerifyCount("SELECT\n" + " *\n" http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java index 88194d5..b66d2ed 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/TestHBaseProjectPushDown.java @@ -33,7 +33,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { @Test public void testColumnWith1RowPushDown() throws Exception{ runSQLVerifyCount("SELECT\n" - + "f2['c7']\n" + + "f2['c7'] as `f[c7]`\n" + "FROM\n" + " hbase.`[TABLE_NAME]` tableName" , 1); @@ -43,7 +43,7 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { public void testRowKeyAndColumnPushDown() throws Exception{ setColumnWidth(9); runSQLVerifyCount("SELECT\n" - + "row_key, f['c1']*31 as `f['c1']*31`, f['c2'] as `f['c2']`, 5 as `5`, 'abc' as `'abc'`\n" + + "row_key, f['c1']*31 as `f[c1]*31`, f['c2'] as `f[c2]`, 5 as `5`, 'abc' as `'abc'`\n" + "FROM\n" + " hbase.`[TABLE_NAME]` tableName" , 6); @@ -51,8 +51,9 @@ public class TestHBaseProjectPushDown extends BaseHBaseTest { @Test public void testColumnFamilyPushDown() throws Exception{ + setColumnWidth(74); runSQLVerifyCount("SELECT\n" - + "f\n" + + "f, f2\n" + "FROM\n" + " hbase.`[TABLE_NAME]` tableName" , 6); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5d7e3d3a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java index 439552f..3d749d6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/MaterializedField.java @@ -24,6 +24,7 @@ import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; +import org.apache.drill.common.types.Types; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.proto.UserBitShared.SerializedField; @@ -202,7 +203,7 @@ public class MaterializedField{ @Override public String toString() { - return "MaterializedField [path=" + path + ", type=" + type + "]"; + return "MaterializedField [path=" + path + ", type=" + Types.toString(type) + "]"; } public String toExpr(){
