Fixes for memory management and hbase reader.

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/3cbcfaa4
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/3cbcfaa4
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/3cbcfaa4

Branch: refs/heads/master
Commit: 3cbcfaa4b0ff15ede9894185c015f52660a703d4
Parents: 7f0491c
Author: Jacques Nadeau <[email protected]>
Authored: Tue May 6 11:01:33 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue May 6 11:01:33 2014 -0700

----------------------------------------------------------------------
 .../exec/store/hbase/HBaseRecordReader.java     | 25 ++++++++------------
 .../drill/hbase/HBaseRecordReaderTest.java      |  1 +
 .../org/apache/drill/exec/vector/BitVector.java | 15 ++++++------
 3 files changed, 19 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 946ee40..aa5743f 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -67,7 +67,6 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   private Scan scan;
   private ResultScanner resultScanner;
   private FragmentContext context;
-  private BufferAllocator allocator;
   Map<FamilyQualifierWrapper, NullableVarBinaryVector> vvMap;
   private Result leftOver;
   private VarBinaryVector rowKeyVector;
@@ -79,7 +78,6 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     this.scan = new Scan(e.getStartRow(), e.getStopRow());
     this.scan.setFilter(e.getScanFilter());
     this.context = context;
-    this.allocator = 
context.getNewChildAllocator(ALLOCATOR_INITIAL_RESERVATION, 
ALLOCATOR_MAX_RESERVATION);
     if (columns != null && columns.size() != 0) {
       for (SchemaPath column : columns) {
         if 
(column.getRootSegment().getPath().toString().equalsIgnoreCase(ROW_KEY)) {
@@ -129,12 +127,11 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
       try {
         if (column.equals(rowKeySchemaPath)) {
           MaterializedField field = MaterializedField.create(column, 
Types.required(TypeProtos.MinorType.VARBINARY));
-          rowKeyVector = new VarBinaryVector(field, allocator);
-          output.addField(rowKeyVector);
+
+          rowKeyVector = output.addField(field, VarBinaryVector.class);
         } else if (column.getRootSegment().getChild() != null){
           MaterializedField field = MaterializedField.create(column, 
Types.optional(TypeProtos.MinorType.VARBINARY));
-          NullableVarBinaryVector v = new NullableVarBinaryVector(field, 
allocator);
-          output.addField(v);
+          NullableVarBinaryVector v = output.addField(field, 
NullableVarBinaryVector.class);
           String fullyQualified = column.getRootSegment().getPath() + "." + 
column.getRootSegment().getChild().getNameSegment().getPath();
           vvMap.put(new FamilyQualifierWrapper(fullyQualified), v);
         }
@@ -156,11 +153,11 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     watch.start();
     if (rowKeyVector != null) {
       rowKeyVector.clear();
-      VectorAllocator.getAllocator(rowKeyVector, 
100).alloc(TARGET_RECORD_COUNT);
+      rowKeyVector.allocateNew();
     }
     for (ValueVector v : vvMap.values()) {
       v.clear();
-      VectorAllocator.getAllocator(v, 100).alloc(TARGET_RECORD_COUNT);
+      v.allocateNew();
     }
     for (int count = 0; count < TARGET_RECORD_COUNT; count++) {
       Result result = null;
@@ -214,19 +211,17 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     return TARGET_RECORD_COUNT;
   }
 
-  @SuppressWarnings("deprecation")
   private NullableVarBinaryVector addNewVector(String column) {
-    MaterializedField field = 
MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), 
Types.optional(TypeProtos.MinorType.VARBINARY));
-    NullableVarBinaryVector v = new NullableVarBinaryVector(field, allocator);
-    VectorAllocator.getAllocator(v, 100).alloc(TARGET_RECORD_COUNT);
-    vvMap.put(new FamilyQualifierWrapper(column), v);
     try {
-      outputMutator.addField(v);
+      MaterializedField field = 
MaterializedField.create(SchemaPath.getCompoundPath(column.split("\\.")), 
Types.optional(TypeProtos.MinorType.VARBINARY));
+      NullableVarBinaryVector v = outputMutator.addField(field, 
NullableVarBinaryVector.class);
+      v.allocateNew();
+      vvMap.put(new FamilyQualifierWrapper(column), v);
       outputMutator.setNewSchema();
+      return v;
     } catch (SchemaChangeException e) {
       throw new DrillRuntimeException(e);
     }
-    return v;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
index e76d867..078df1f 100644
--- 
a/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
+++ 
b/contrib/storage-hbase/src/test/java/org/apache/drill/hbase/HBaseRecordReaderTest.java
@@ -100,6 +100,7 @@ public class HBaseRecordReaderTest extends PopUnitTestBase {
         batchLoader.load(b.getHeader().getDef(), b.getData());
         VectorUtil.showVectorAccessibleContent(batchLoader);
         recordCount += batchLoader.getRecordCount();
+        if(b.getData() != null) b.getData().release();
       }
 
       Assert.assertEquals(records, recordCount);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/3cbcfaa4/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
index 155d7d6..63384a3 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java
@@ -72,11 +72,12 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
     } else if (allocationMonitor < -5) {
       allocationValueCount = (int) (allocationValueCount * 1.1);
     }
+    allocateNew(allocationValueCount);
   }
 
   /**
    * Allocate a new memory space for this vector. Must be called prior to 
using the ValueVector.
-   * 
+   *
    * @param valueCount
    *          The number of values which can be contained within this vector.
    */
@@ -103,7 +104,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
   public void copyFrom(int inIndex, int outIndex, BitVector from) {
     this.mutator.set(outIndex, from.accessor.get(inIndex));
   }
-  
+
   public boolean copyFromSafe(int inIndex, int outIndex, BitVector from){
     if(outIndex >= this.getValueCapacity()) return false;
     copyFrom(inIndex, outIndex, from);
@@ -213,7 +214,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
 
     /**
      * Get the byte holding the desired bit, then mask all other bits. Iff the 
result is 0, the bit was not set.
-     * 
+     *
      * @param index
      *          position of the bit in the vector
      * @return 1 if set, otherwise 0
@@ -230,7 +231,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
     public boolean isNull(int index){
       return false;
     }
-    
+
     @Override
     public final Object getObject(int index) {
       return new Boolean(get(index) != 0);
@@ -252,7 +253,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
   /**
    * MutableBit implements a vector of bit-width values. Elements in the 
vector are accessed by position from the
    * logical start of the vector. Values should be pushed onto the vector 
sequentially, but may be randomly accessed.
-   * 
+   *
    * NB: this class is automatically generated from ValueVectorTypes.tdd using 
FreeMarker.
    */
   public class Mutator extends BaseMutator {
@@ -262,7 +263,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
 
     /**
      * Set the bit at the given index to the specified value.
-     * 
+     *
      * @param index
      *          position of the bit to set
      * @param value
@@ -287,7 +288,7 @@ public final class BitVector extends BaseDataValueVector 
implements FixedWidthVe
     final void set(int index, NullableBitHolder holder) {
       set(index, holder.value);
     }
-    
+
     public boolean setSafe(int index, int value) {
       if(index >= getValueCapacity()) {
         allocationMonitor--;

Reply via email to