Fixes for memory management and hbase reader.
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3cbcfaa4 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3cbcfaa4 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3cbcfaa4 Branch: refs/heads/master Commit: 3cbcfaa4b0ff15ede9894185c015f52660a703d4 Parents: 7f0491c Author: Jacques Nadeau <[email protected]> Authored: Tue May 6 11:01:33 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Tue May 6 11:01:33 2014 -0700 ---------------------------------------------------------------------- .../exec/store/hbase/HBaseRecordReader.java | 25 ++++++++------------ .../drill/hbase/HBaseRecordReaderTest.java | 1 + .../org/apache/drill/exec/vector/BitVector.java | 15 ++++++------ 3 files changed, 19 insertions(+), 22 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 946ee40..aa5743f 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -67,7 +67,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private Scan scan; private ResultScanner resultScanner; private FragmentContext context; - private BufferAllocator allocator; Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap; private Result leftOver; private VarBinaryVector rowKeyVector; @@ -79,7 +78,6 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { this.scan = new Scan(e.getStartRow(), e.getStopRow()); this.scan.setFilter(e.getScanFilter()); this.context = context; - this.allocator = context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, ALLOCATOR_MAX_RESERVATION); if (columns != null && columns.size() != 0) { for (SchemaPath column : columns) { if (column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) { @@ -129,12 +127,11 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { try { if (column.equals(rowKeySchemaPath)) { MaterializedField field = MaterializedField.create(column, Types.required(TypeProtos.MinorType.VARBINARY)); - rowKeyVector = new VarBinaryVector(field, allocator); - output.addField(rowKeyVector); + + rowKeyVector = output.addField(field, VarBinaryVector.class); } else if (column.getRootSegment().getChild() != null){ MaterializedField field = MaterializedField.create(column, Types.optional(TypeProtos.MinorType.VARBINARY)); - NullableVarBinaryVector v = new NullableVarBinaryVector(field, allocator); - output.addField(v); + NullableVarBinaryVector v = output.addField(field, NullableVarBinaryVector.class); String fullyQualified = column.getRootSegment().getPath() + "." + column.getRootSegment().getChild().getNameSegment().getPath(); vvMap.put(new FamilyQualifierWrapper(fullyQualified), v); } @@ -156,11 +153,11 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { watch.start(); if (rowKeyVector != null) { rowKeyVector.clear(); - VectorAllocator.getAllocator(rowKeyVector, 100).alloc(TARGET_RECORD_COUNT); + rowKeyVector.allocateNew(); } for (ValueVector v : vvMap.values()) { v.clear(); - VectorAllocator.getAllocator(v, 100).alloc(TARGET_RECORD_COUNT); + v.allocateNew(); } for (int count = 0; count < TARGET_RECORD_COUNT; count++) { Result result = null; @@ -214,19 +211,17 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { return TARGET_RECORD_COUNT; } - @SuppressWarnings("deprecation") private NullableVarBinaryVector addNewVector(String column) { - MaterializedField field = MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), Types.optional(TypeProtos.MinorType.VARBINARY)); - NullableVarBinaryVector v = new NullableVarBinaryVector(field, allocator); - VectorAllocator.getAllocator(v, 100).alloc(TARGET_RECORD_COUNT); - vvMap.put(new FamilyQualifierWrapper(column), v); try { - outputMutator.addField(v); + MaterializedField field = MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), Types.optional(TypeProtos.MinorType.VARBINARY)); + NullableVarBinaryVector v = outputMutator.addField(field, NullableVarBinaryVector.class); + v.allocateNew(); + vvMap.put(new FamilyQualifierWrapper(column), v); outputMutator.setNewSchema(); + return v; } catch (SchemaChangeException e) { throw new DrillRuntimeException(e); } - return v; } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java index e76d867..078df1f 100644 --- a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java +++ b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java @@ -100,6 +100,7 @@ public class HBaseRecordReaderTest extends PopUnitTestBase { batchLoader.load(b.getHeader().getDef(), b.getData()); VectorUtil.showVectorAccessibleContent(batchLoader); recordCount += batchLoader.getRecordCount(); + if(b.getData() != null) b.getData().release(); } Assert.assertEquals(records, recordCount); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index 155d7d6..63384a3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -72,11 +72,12 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } else if (allocationMonitor < -5) { allocationValueCount = (int) (allocationValueCount * 1.1); } + allocateNew(allocationValueCount); } /** * Allocate a new memory space for this vector. Must be called prior to using the ValueVector. - * + * * @param valueCount * The number of values which can be contained within this vector. */ @@ -103,7 +104,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe public void copyFrom(int inIndex, int outIndex, BitVector from) { this.mutator.set(outIndex, from.accessor.get(inIndex)); } - + public boolean copyFromSafe(int inIndex, int outIndex, BitVector from){ if(outIndex >= this.getValueCapacity()) return false; copyFrom(inIndex, outIndex, from); @@ -213,7 +214,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe /** * Get the byte holding the desired bit, then mask all other bits. Iff the result is 0, the bit was not set. - * + * * @param index * position of the bit in the vector * @return 1 if set, otherwise 0 @@ -230,7 +231,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe public boolean isNull(int index){ return false; } - + @Override public final Object getObject(int index) { return new Boolean(get(index) != 0); @@ -252,7 +253,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe /** * MutableBit implements a vector of bit-width values. Elements in the vector are accessed by position from the * logical start of the vector. Values should be pushed onto the vector sequentially, but may be randomly accessed. - * + * * NB: this class is automatically generated from ValueVectorTypes.tdd using FreeMarker. */ public class Mutator extends BaseMutator { @@ -262,7 +263,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe /** * Set the bit at the given index to the specified value. - * + * * @param index * position of the bit to set * @param value @@ -287,7 +288,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe final void set(int index, NullableBitHolder holder) { set(index, holder.value); } - + public boolean setSafe(int index, int value) { if(index >= getValueCapacity()) { allocationMonitor--;
