http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java
new file mode 100644
index 0000000..020c5d8
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.physical.impl.common;
+
+public class IndexPointer {
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(IndexPointer.class);
+
+  public int value;
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
index 7233f69..15044b8 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java
@@ -18,7 +18,6 @@
 package org.apache.drill.exec.physical.impl.join;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.drill.common.expression.FieldReference;
@@ -35,7 +34,6 @@ import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.ClassGenerator;
 import org.apache.drill.exec.expr.CodeGenerator;
 import org.apache.drill.exec.expr.TypeHelper;
-import org.apache.drill.exec.expr.holders.IntHolder;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
 import org.apache.drill.exec.ops.MetricDef;
@@ -44,8 +42,8 @@ import 
org.apache.drill.exec.physical.impl.common.ChainedHashTable;
 import org.apache.drill.exec.physical.impl.common.HashTable;
 import org.apache.drill.exec.physical.impl.common.HashTableConfig;
 import org.apache.drill.exec.physical.impl.common.HashTableStats;
+import org.apache.drill.exec.physical.impl.common.IndexPointer;
 import org.apache.drill.exec.physical.impl.sort.RecordBatchData;
-import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch;
 import org.apache.drill.exec.record.AbstractRecordBatch;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.ExpandableHyperContainer;
@@ -54,7 +52,6 @@ import org.apache.drill.exec.record.RecordBatch;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.record.VectorWrapper;
 import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.allocator.VectorAllocator;
 import org.eigenbase.rel.JoinRelType;
 
 import com.sun.codemodel.JExpr;
@@ -155,7 +152,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
       NUM_ENTRIES,
       NUM_RESIZING,
       RESIZING_TIME;
-      
+
       // duplicate for hash ag
 
       @Override
@@ -163,7 +160,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
         return ordinal();
       }
     }
-    
+
     @Override
     public int getRecordCount() {
         return outputRecords;
@@ -339,7 +336,7 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
                     hjHelper.addNewBatch(currentRecordCount);
 
                     // Holder contains the global index where the key is 
hashed into using the hash table
-                    IntHolder htIndex = new IntHolder();
+                    IndexPointer htIndex = new IndexPointer();
 
                     // For every record in the build batch , hash the key 
columns
                     for (int i = 0; i < currentRecordCount; i++) {
@@ -491,7 +488,9 @@ public class HashJoinBatch extends 
AbstractRecordBatch<HashJoinPOP> {
 
     @Override
     public void cleanup() {
-        hjHelper.clear();
+        if(hjHelper != null){
+          hjHelper.clear();
+        }
 
         // If we didn't receive any data, hyperContainer may be null, check 
before clearing
         if (hyperContainer != null) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
index c5d342f..994eea4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java
@@ -219,6 +219,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler {
       phyRelNode = 
ProducerConsumerPrelVisitor.addProducerConsumerToScans(phyRelNode, (int) 
queueSize);
     }
 
+
     /* 6.)
      * if the client does not support complex types (Map, Repeated)
      * insert a project which which would convert

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
index 61c1689..ac26969 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java
@@ -20,6 +20,7 @@ package org.apache.drill.exec.record;
 import io.netty.buffer.ByteBuf;
 import io.netty.buffer.ByteBufAllocator;
 import io.netty.buffer.ByteBufProcessor;
+import io.netty.buffer.DrillBuf;
 
 import java.io.IOException;
 import java.io.InputStream;
@@ -32,14 +33,14 @@ import java.nio.charset.Charset;
 
 public class DeadBuf extends ByteBuf {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DeadBuf.class);
-  
+
   private static final String ERROR_MESSAGE = "Attemped to access a DeadBuf. 
This would happen if you attempted to interact with a buffer that has been 
moved or not yet initialized.";
-  
-  public static final DeadBuf DEAD_BUFFER = new DeadBuf();
+
+  public static final DrillBuf DEAD_BUFFER = null;
 
   private DeadBuf(){}
 
-  
+
   @Override
   public boolean isReadable(int size) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -654,7 +655,7 @@ public class DeadBuf extends ByteBuf {
 
   }
 
-  
+
   @Override
   public int bytesBefore(byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -667,14 +668,14 @@ public class DeadBuf extends ByteBuf {
 
   }
 
-  
+
   @Override
   public int bytesBefore(int index, int length, byte value) {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
 
   }
 
-  
+
   @Override
   public ByteBuf copy() {
     throw new UnsupportedOperationException(ERROR_MESSAGE);
@@ -839,6 +840,6 @@ public class DeadBuf extends ByteBuf {
   public String toString() {
     return null;
   }
-  
-  
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
index 0cca9cd..dbf17c4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.record;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
@@ -29,10 +30,10 @@ public class RawFragmentBatch {
 
   final RemoteConnection connection;
   final FragmentRecordBatch header;
-  final ByteBuf body;
+  final DrillBuf body;
   final ResponseSender sender;
 
-  public RawFragmentBatch(RemoteConnection connection, FragmentRecordBatch 
header, ByteBuf body, ResponseSender sender) {
+  public RawFragmentBatch(RemoteConnection connection, FragmentRecordBatch 
header, DrillBuf body, ResponseSender sender) {
     super();
     this.header = header;
     this.body = body;
@@ -45,7 +46,7 @@ public class RawFragmentBatch {
     return header;
   }
 
-  public ByteBuf getBody() {
+  public DrillBuf getBody() {
     return body;
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
index 0caed7d..d756685 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java
@@ -17,13 +17,12 @@
  */
 package org.apache.drill.exec.record;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
-import io.netty.buffer.EmptyByteBuf;
 import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
@@ -57,7 +56,7 @@ public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapp
    * @return Whether or not the schema changed since the previous load.
    * @throws SchemaChangeException
    */
-  public boolean load(RecordBatchDef def, ByteBuf buf) throws 
SchemaChangeException {
+  public boolean load(RecordBatchDef def, DrillBuf buf) throws 
SchemaChangeException {
 //    logger.debug("Loading record batch with def {} and data {}", def, buf);
     container.zeroVectors();
     this.valueCount = def.getRecordCount();
@@ -85,7 +84,7 @@ public class RecordBatchLoader implements VectorAccessible, 
Iterable<VectorWrapp
       }
       if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || 
fmd.getGroupCount() == 0)) {
 //        v.clear();
-        v.load(fmd, new EmptyByteBuf(allocator.getUnderlyingAllocator()));
+        v.load(fmd, allocator.buffer(0));
       } else {
         v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength()));
       }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
index 22aa731..3154e6e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java
@@ -231,5 +231,14 @@ public class TypedFieldId {
     return true;
   }
 
+  @Override
+  public String toString() {
+    final int maxLen = 10;
+    return "TypedFieldId [fieldIds="
+        + (fieldIds != null ? Arrays.toString(Arrays.copyOf(fieldIds, 
Math.min(fieldIds.length, maxLen))) : null)
+        + ", remainder=" + remainder + "]";
+  }
+
+
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
index b55ed82..3810115 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java
@@ -17,8 +17,7 @@
  */
 package org.apache.drill.exec.record;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.CompositeByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.util.List;
 
@@ -37,15 +36,15 @@ public class WritableBatch {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(WritableBatch.class);
 
   private final RecordBatchDef def;
-  private final ByteBuf[] buffers;
+  private final DrillBuf[] buffers;
   private boolean cleared = false;
 
-  private WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) {
+  private WritableBatch(RecordBatchDef def, List<DrillBuf> buffers) {
     this.def = def;
-    this.buffers = buffers.toArray(new ByteBuf[buffers.size()]);
+    this.buffers = buffers.toArray(new DrillBuf[buffers.size()]);
   }
 
-  private WritableBatch(RecordBatchDef def, ByteBuf[] buffers) {
+  private WritableBatch(RecordBatchDef def, DrillBuf[] buffers) {
     super();
     this.def = def;
     this.buffers = buffers;
@@ -55,20 +54,27 @@ public class WritableBatch {
     return def;
   }
 
-  public ByteBuf[] getBuffers() {
+  public DrillBuf[] getBuffers() {
     return buffers;
   }
 
   public void reconstructContainer(VectorContainer container) {
     Preconditions.checkState(!cleared,
         "Attempted to reconstruct a container from a WritableBatch after it 
had been cleared");
-    if (buffers.length > 0) { /* If we have ByteBuf's associated with value 
vectors */
+    if (buffers.length > 0) { /* If we have DrillBuf's associated with value 
vectors */
+      int len = 0;
+      for(DrillBuf b : buffers){
+        len += b.capacity();
+      }
 
-      CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, 
buffers.length);
+      DrillBuf newBuf = buffers[0].getAllocator().buffer(len);
 
       /* Copy data from each buffer into the compound buffer */
-      for (ByteBuf buf : buffers) {
-        cbb.addComponent(buf);
+      int offset = 0;
+      for (DrillBuf buf : buffers) {
+        newBuf.setBytes(offset, buf);
+        offset += buf.capacity();
+        buf.release();
       }
 
       List<SerializedField> fields = def.getFieldList();
@@ -83,7 +89,7 @@ public class WritableBatch {
       for (VectorWrapper<?> vv : container) {
         SerializedField fmd = fields.get(vectorIndex);
         ValueVector v = vv.getValueVector();
-        ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength());
+        DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength());
 //        v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength()));
         v.load(fmd, bb);
         bb.release();
@@ -109,7 +115,7 @@ public class WritableBatch {
 
   public void clear() {
     if(cleared) return;
-    for (ByteBuf buf : buffers) {
+    for (DrillBuf buf : buffers) {
       buf.release();
     }
     cleared = true;
@@ -125,7 +131,7 @@ public class WritableBatch {
   }
 
   public static WritableBatch getBatchNoHV(int recordCount, 
Iterable<ValueVector> vectors, boolean isSV2) {
-    List<ByteBuf> buffers = Lists.newArrayList();
+    List<DrillBuf> buffers = Lists.newArrayList();
     List<SerializedField> metadata = Lists.newArrayList();
 
     for (ValueVector vv : vectors) {
@@ -137,7 +143,7 @@ public class WritableBatch {
         continue;
       }
 
-      for (ByteBuf b : vv.getBuffers()) {
+      for (DrillBuf b : vv.getBuffers()) {
         buffers.add(b);
       }
       // remove vv access to buffers.
@@ -159,13 +165,13 @@ public class WritableBatch {
   }
 
   public void retainBuffers() {
-    for (ByteBuf buf : buffers) {
+    for (DrillBuf buf : buffers) {
       buf.retain();
     }
   }
-  
+
   public void retainBuffers(int increment) {
-    for (ByteBuf buf : buffers) {
+    for (DrillBuf buf : buffers) {
       buf.retain(increment);
     }
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
index af1e2b6..038bb2f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.record.selection;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.io.Closeable;
 import java.io.IOException;
@@ -33,7 +33,7 @@ public class SelectionVector2 implements Closeable{
 
   private final BufferAllocator allocator;
   private int recordCount;
-  private ByteBuf buffer = DeadBuf.DEAD_BUFFER;
+  private DrillBuf buffer = DeadBuf.DEAD_BUFFER;
 
   public static final int RECORD_SIZE = 2;
 
@@ -45,9 +45,9 @@ public class SelectionVector2 implements Closeable{
     return recordCount;
   }
 
-  public ByteBuf getBuffer()
+  public DrillBuf getBuffer()
   {
-      ByteBuf bufferHandle = this.buffer;
+      DrillBuf bufferHandle = this.buffer;
 
       /* Increment the ref count for this buffer */
       bufferHandle.retain();
@@ -60,7 +60,7 @@ public class SelectionVector2 implements Closeable{
       return bufferHandle;
   }
 
-  public void setBuffer(ByteBuf bufferHandle)
+  public void setBuffer(DrillBuf bufferHandle)
   {
       /* clear the existing buffer */
       clear();
@@ -70,7 +70,6 @@ public class SelectionVector2 implements Closeable{
   }
 
   public char getIndex(int index){
-    
     return buffer.getChar(index * RECORD_SIZE);
   }
 
@@ -78,6 +77,14 @@ public class SelectionVector2 implements Closeable{
     buffer.setChar(index * RECORD_SIZE, value);
   }
 
+  public long getDataAddr(){
+    return buffer.memoryAddress();
+  }
+
+  public void setIndex(int index, int value){
+    buffer.setChar(index, value);
+  }
+
   public boolean allocateNew(int size){
     clear();
     buffer = allocator.buffer(size * RECORD_SIZE);
@@ -100,7 +107,7 @@ public class SelectionVector2 implements Closeable{
     clear();
     return newSV;
   }
-  
+
   public void clear() {
     if (buffer != null && buffer != DeadBuf.DEAD_BUFFER) {
       buffer.release();
@@ -108,7 +115,7 @@ public class SelectionVector2 implements Closeable{
       recordCount = 0;
     }
   }
-  
+
   public void setRecordCount(int recordCount){
 //    logger.debug("Seting record count to {}", recordCount);
     this.recordCount = recordCount;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
index 8176567..93f5b6d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java
@@ -30,9 +30,9 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.common.types.TypeProtos.DataMode;
 import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.common.types.TypeProtos.MinorType;
-import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling;
 import org.apache.drill.exec.expr.fn.DrillFuncHolder;
+import org.apache.drill.exec.util.DecimalUtility;
 
 public class TypeCastRules {
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
index 473e3e6..02fb75e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java
@@ -18,8 +18,6 @@
 package org.apache.drill.exec.rpc;
 
 import io.netty.buffer.ByteBuf;
-import io.netty.buffer.SwappedByteBuf;
-import io.netty.buffer.Unpooled;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.handler.codec.ByteToMessageDecoder;
 import io.netty.handler.codec.CorruptedFrameException;
@@ -29,7 +27,6 @@ import java.util.List;
 import org.apache.drill.exec.memory.BufferAllocator;
 
 import com.google.protobuf.CodedInputStream;
-import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode;
 
 /**
  * Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy.
@@ -39,7 +36,7 @@ public class ProtobufLengthDecoder extends 
ByteToMessageDecoder {
 
   private BufferAllocator allocator;
   private OutOfMemoryHandler outOfMemoryHandler;
-  
+
   public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler 
outOfMemoryHandler) {
     super();
     this.allocator = allocator;
@@ -90,14 +87,14 @@ public class ProtobufLengthDecoder extends 
ByteToMessageDecoder {
             return;
           }
           outBuf.writeBytes(in, in.readerIndex(), length);
-          
+
           in.skipBytes(length);
-          
+
           if (RpcConstants.EXTRA_DEBUGGING)
             logger.debug(String.format(
                 "ReaderIndex is %d after length header of %d bytes and frame 
body of length %d bytes.",
                 in.readerIndex(), i + 1, length));
-          
+
           out.add(outBuf);
           return;
         }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
index f1c94d3..d4a73c3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java
@@ -17,11 +17,10 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
 import org.apache.drill.exec.rpc.RemoteConnection;
-import org.apache.drill.exec.rpc.Response;
 import org.apache.drill.exec.rpc.ResponseSender;
 import org.apache.drill.exec.rpc.RpcException;
 import org.apache.drill.exec.work.fragment.FragmentManager;
@@ -29,7 +28,7 @@ import org.apache.drill.exec.work.fragment.FragmentManager;
 public interface DataResponseHandler {
 
   public void handle(RemoteConnection connection, FragmentManager manager, 
FragmentRecordBatch fragmentBatch,
-      ByteBuf data, ResponseSender responder) throws RpcException;
+      DrillBuf data, ResponseSender responder) throws RpcException;
 
   public void informOutOfMemory();
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
index 5d71c90..c37550f 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.exception.FragmentSetupException;
 import org.apache.drill.exec.proto.BitData.FragmentRecordBatch;
@@ -48,7 +48,7 @@ public class DataResponseHandlerImpl implements 
DataResponseHandler{
   }
 
 
-  public void handle(RemoteConnection connection, FragmentManager manager, 
FragmentRecordBatch fragmentBatch, ByteBuf data, ResponseSender sender) throws 
RpcException {
+  public void handle(RemoteConnection connection, FragmentManager manager, 
FragmentRecordBatch fragmentBatch, DrillBuf data, ResponseSender sender) throws 
RpcException {
 //    logger.debug("Fragment Batch received {}", fragmentBatch);
     try {
       boolean canRun = manager.handle(new RawFragmentBatch(connection, 
fragmentBatch, data, sender));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
index 42dee94..8611b5a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java
@@ -17,7 +17,7 @@
  */
 package org.apache.drill.exec.rpc.data;
 
-import io.netty.buffer.AccountingByteBuf;
+import io.netty.buffer.DrillBuf;
 import io.netty.buffer.ByteBuf;
 import io.netty.channel.Channel;
 import io.netty.channel.ChannelFuture;
@@ -114,11 +114,11 @@ public class DataServer extends BasicServer<RpcType, 
BitServerConnection> {
       }
       BufferAllocator allocator = manager.getFragmentContext().getAllocator();
       if(body != null){
-        if(!allocator.takeOwnership((AccountingByteBuf) body.unwrap())){
+        if(!allocator.takeOwnership((DrillBuf) body.unwrap())){
           dataHandler.handle(connection, manager, OOM_FRAGMENT, null, null);
         }
       }
-      dataHandler.handle(connection, manager, fragmentBatch, body, sender);
+      dataHandler.handle(connection, manager, fragmentBatch, (DrillBuf) body, 
sender);
 
     } catch (FragmentSetupException e) {
       logger.error("Failure while getting fragment manager. {}", 
QueryIdHelper.getQueryIdentifier(handle),  e);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
index 98e5943..e36a1c6 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java
@@ -17,32 +17,33 @@
  */
 package org.apache.drill.exec.rpc.user;
 
-import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import org.apache.drill.exec.proto.UserBitShared.QueryResult;
 
 public class QueryResultBatch {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class);
-  
+
   private final QueryResult header;
-  private final ByteBuf data;
-  
-  public QueryResultBatch(QueryResult header, ByteBuf data) {
+  private final DrillBuf data;
+
+  public QueryResultBatch(QueryResult header, DrillBuf data) {
 //    logger.debug("New Result Batch with header {} and data {}", header, 
data);
     this.header = header;
     this.data = data;
     if(this.data != null) data.retain();
   }
 
+
   public QueryResult getHeader() {
     return header;
   }
 
-  public ByteBuf getData() {
+  public DrillBuf getData() {
     return data;
   }
-  
-  
+
+
   public boolean hasData(){
     return data != null;
   }
@@ -50,12 +51,12 @@ public class QueryResultBatch {
   public void release(){
     if(data != null) data.release();
   }
-  
+
   @Override
   public String toString() {
     return "QueryResultBatch [header=" + header + ", data=" + data + "]";
   }
-  
-  
-  
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
index 95b7446..4bbb13e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.rpc.user;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.ConcurrentMap;
@@ -55,7 +56,7 @@ public class QueryResultHandler {
 
   public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf 
dBody) throws RpcException {
     final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER);
-    final QueryResultBatch batch = new QueryResultBatch(result, dBody);
+    final QueryResultBatch batch = new QueryResultBatch(result, (DrillBuf) 
dBody);
     final boolean failed = (batch.getHeader().getQueryState() == 
QueryState.FAILED);
 
     assert failed || batch.getHeader().getErrorCount() == 0 : "Error count for 
the query batch is non-zero but QueryState != FAILED";

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
index eed0126..7d48711 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 
 import org.apache.drill.common.config.DrillConfig;
 import org.apache.drill.exec.cache.DistributedCache;
+import org.apache.drill.exec.compile.CodeCompiler;
 import org.apache.drill.exec.coord.ClusterCoordinator;
 import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
 import org.apache.drill.exec.memory.BufferAllocator;
@@ -57,6 +58,7 @@ public class DrillbitContext {
   private final FunctionImplementationRegistry functionRegistry;
   private final SystemOptionManager systemOptions;
   private final PStoreProvider provider;
+  private final CodeCompiler compiler;
 
   public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, 
ClusterCoordinator coord, Controller controller, DataConnectionCreator 
connectionsPool, DistributedCache cache, WorkEventBus workBus, PStoreProvider 
provider) {
     super();
@@ -77,8 +79,7 @@ public class DrillbitContext {
     this.operatorCreatorRegistry = new 
OperatorCreatorRegistry(context.getConfig());
     this.functionRegistry = new 
FunctionImplementationRegistry(context.getConfig());
     this.systemOptions = new SystemOptionManager(context.getConfig(), 
provider);
-
-//    this.globalDrillOptions = new DistributedGlobalOptions(this.cache);
+    this.compiler = new CodeCompiler(context.getConfig(), cache, 
systemOptions);
   }
 
   public FunctionImplementationRegistry getFunctionImplementationRegistry() {
@@ -153,4 +154,10 @@ public class DrillbitContext {
     return coord;
   }
 
+  public CodeCompiler getCompiler() {
+    return compiler;
+  }
+
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index eddb818..b322086 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -99,9 +99,10 @@ public class SystemOptionManager implements OptionManager{
         .build();
   }
 
-  public void init() throws IOException{
+  public SystemOptionManager init() throws IOException{
     this.options = provider.getPStore(config);
     this.admin = new SystemOptionAdmin();
+    return this;
   }
 
   private class Iter implements Iterator<OptionValue>{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index cd3cfdc..5d741e0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -59,7 +59,7 @@ public class JSONRecordReader2 implements RecordReader{
                           List<SchemaPath> columns) throws 
OutOfMemoryException {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
-    this.fragmentContext=fragmentContext;
+    this.fragmentContext = fragmentContext;
   }
 
   @Override
@@ -69,9 +69,9 @@ public class JSONRecordReader2 implements RecordReader{
       JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream);
       this.writer = new VectorContainerWriter(output);
       this.mutator = output;
-      jsonReader = new JsonReaderWithState(splitter);
+      jsonReader = new JsonReaderWithState(splitter, 
fragmentContext.getManagedBuffer());
     }catch(Exception e){
-      handleAndRaise("Failure reading JSON file.", e);
+      throw new ExecutionSetupException("Failure reading JSON file.", e);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 5c25e11..54771e4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -18,16 +18,19 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.DateVector;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
 import org.apache.drill.exec.vector.ValueVector;
 import org.joda.time.DateTimeUtils;
+
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -36,9 +39,9 @@ import java.math.BigDecimal;
 
 class FixedByteAlignedReader extends ColumnReader {
 
-  protected ByteBuf bytebuf;
+  protected DrillBuf bytebuf;
+
 
-  
   FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
                          boolean fixedLength, ValueVector v, SchemaElement 
schemaElement) throws ExecutionSetupException {
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
@@ -120,7 +123,7 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), 
Decimal28SparseHolder.nDecimalDigits);
     }
@@ -139,7 +142,7 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal38Vector.getData(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), 
Decimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index 4d7f312..f88d56a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableFloat4Vector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
@@ -31,8 +33,8 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter;
 import org.apache.drill.exec.vector.NullableDateVector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal38SparseVector;
-
 import org.joda.time.DateTimeUtils;
+
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -42,7 +44,7 @@ import java.math.BigDecimal;
 public class NullableFixedByteAlignedReaders {
 
   static class NullableFixedByteAlignedReader extends NullableColumnReader {
-    protected ByteBuf bytebuf;
+    protected DrillBuf bytebuf;
 
     NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                       ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, ValueVector v, SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -67,8 +69,6 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryIntReader extends 
NullableColumnReader<NullableIntVector> {
 
-    private ByteBuf bytebuf;
-
     NullableDictionaryIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                 ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableIntVector v,
                                 SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -88,8 +88,6 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryBigIntReader extends 
NullableColumnReader<NullableBigIntVector> {
 
-    private ByteBuf bytebuf;
-
     NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableBigIntVector v,
                                    SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -107,8 +105,6 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat4Reader extends 
NullableColumnReader<NullableFloat4Vector> {
 
-    private ByteBuf bytebuf;
-
     NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat4Vector v,
                                    SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -126,8 +122,6 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat8Reader extends 
NullableColumnReader<NullableFloat8Vector> {
 
-    private ByteBuf bytebuf;
-
     NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat8Vector v,
                                   SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -211,7 +205,7 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), 
NullableDecimal28SparseHolder.nDecimalDigits);
     }
@@ -230,7 +224,7 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal38Vector.getData(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), 
NullableDecimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index 83812ea..ba9ff80 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -18,8 +18,11 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
+
 import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
@@ -38,7 +41,7 @@ public abstract class NullableVarLengthValuesColumn<V extends 
ValueVector> exten
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
   }
 
-  public abstract boolean setSafe(int index, ByteBuf value, int start, int 
length);
+  public abstract boolean setSafe(int index, DrillBuf value, int start, int 
length);
 
   public abstract int capacity();
 
@@ -82,8 +85,7 @@ public abstract class NullableVarLengthValuesColumn<V extends 
ValueVector> exten
     }
     else {
       // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
-      dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(),
-          (int) pageReader.readyToReadPosInBytes);
+      dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) 
pageReader.readyToReadPosInBytes);
     }
     // I think this also needs to happen if it is null for the random access
     if (! variableWidthVector.getMutator().setValueLengthSafe((int) 
valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 1687c3b..4165cbd 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -22,7 +22,9 @@ import java.util.ArrayList;
 import java.util.List;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 import io.netty.buffer.Unpooled;
+
 import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
@@ -54,7 +56,7 @@ final class PageReader {
   // store references to the pages that have been uncompressed, but not copied 
to ValueVectors yet
   Page currentPage;
   // buffer to store bytes of current page
-  ByteBuf pageDataByteArray;
+  DrillBuf pageDataByteArray;
 
   // for variable length data we need to keep track of our current position in 
the page data
   // as the values and lengths are intermixed, making random access to the 
length data impossible
@@ -134,10 +136,10 @@ final class PageReader {
       throw new ExecutionSetupException("Error opening or reading metadata for 
parquet file at location: "
         + path.getName(), e);
     }
-    
+
   }
 
-  
+
   /**
    * Grab the next page.
    *
@@ -226,7 +228,7 @@ final class PageReader {
       return false;
     }
 
-    pageDataByteArray = 
Unpooled.wrappedBuffer(currentPage.getBytes().toByteBuffer());
+    pageDataByteArray = 
DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer());
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
@@ -272,7 +274,7 @@ final class PageReader {
     readyToReadPosInBytes = readPosInBytes;
     return true;
   }
-  
+
   public void clear(){
     this.dataReader.clear();
     // Free all memory, including fixed length types. (Data is being copied 
for all types not just var length types)

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 555cb94..ecfa110 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -17,18 +17,20 @@
  
******************************************************************************/
 package org.apache.drill.exec.store.parquet.columnreaders;
 
-import java.math.BigDecimal;
-
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
 import io.netty.buffer.Unpooled;
+
+import java.math.BigDecimal;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
-import org.apache.drill.common.util.DecimalUtility;
-import org.apache.drill.exec.expr.holders.VarCharHolder;
-import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
-import org.apache.drill.exec.expr.holders.VarBinaryHolder;
-import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
 import org.apache.drill.exec.vector.Decimal38SparseVector;
 import org.apache.drill.exec.vector.NullableDecimal28SparseVector;
@@ -37,6 +39,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.VarBinaryVector;
 import org.apache.drill.exec.vector.VarCharVector;
+
 import parquet.column.ColumnDescriptor;
 import parquet.format.SchemaElement;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
@@ -56,9 +59,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
+    public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) 
{
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= decimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -85,9 +88,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
+    public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) 
{
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= nullableDecimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -115,9 +118,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
+    public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) 
{
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= decimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -144,9 +147,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
+    public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) 
{
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= nullableDecimal38Vector.getValueCapacity()) {
         return false;
       }
@@ -176,12 +179,12 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
+    public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) 
{
       boolean success;
       if(index >= varCharVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
         int st=0;
         int len=currDictVal.length();
         VarCharHolder holder = new VarCharHolder();
@@ -211,44 +214,34 @@ public class VarLengthColumnReaders {
     int nullsRead;
     boolean currentValNull = false;
     // store a hard reference to the vector (which is also stored in the 
superclass) to prevent repetitive casting
-    protected NullableVarCharVector nullableVarCharVector;
+    protected final NullableVarCharVector.Mutator mutator;
+    private final NullableVarCharVector vector;
 
     NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor,
                           ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, NullableVarCharVector v,
                           SchemaElement schemaElement) throws 
ExecutionSetupException {
       super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
-      nullableVarCharVector = v;
+      vector = v;
+      this.mutator = vector.getMutator();
     }
 
-    public boolean setSafe(int index, ByteBuf value, int start, int length) {
+    public boolean setSafe(int index, DrillBuf value, int start, int length) {
       boolean success;
-      if(index >= nullableVarCharVector.getValueCapacity()) return false;
+      if(index >= vector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
-        int st=0;
-        int len=currDictVal.length();
-        NullableVarCharHolder holder = new NullableVarCharHolder();
-        holder.buffer=b;
-        holder.start=0;
-        holder.end=currDictVal.length();
-        success = nullableVarCharVector.getMutator().setSafe(index, holder);
-        holder.isSet=1;
+        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
+        success = mutator.setSafe(index, 1, 0, currDictVal.length(), b);
       }
       else {
-        NullableVarCharHolder holder = new NullableVarCharHolder();
-        holder.buffer=value;
-        holder.start=start;
-        holder.end=start+length;
-        holder.isSet=1;
-        success = nullableVarCharVector.getMutator().setSafe(index, holder);
+        success = mutator.setSafe(index, 1, start, start+length, value);
       }
       return success;
     }
 
     @Override
     public int capacity() {
-      return nullableVarCharVector.getData().capacity();
+      return vector.getData().capacity();
     }
   }
 
@@ -265,12 +258,12 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, ByteBuf value, int start, int length) {
+    public boolean setSafe(int index, DrillBuf value, int start, int length) {
       boolean success;
       if(index >= varBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
         int st=0;
         int len=currDictVal.length();
         VarBinaryHolder holder = new VarBinaryHolder();
@@ -309,14 +302,12 @@ public class VarLengthColumnReaders {
       nullableVarBinaryVector = v;
     }
 
-    public boolean setSafe(int index, ByteBuf value, int start, int length) {
+    public boolean setSafe(int index, DrillBuf value, int start, int length) {
       boolean success;
       if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
-        int st=0;
-        int len=currDictVal.length();
+        DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer());
         NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
         holder.buffer=b;
         holder.start=0;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 7f16c64..829b44a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -18,9 +18,12 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import io.netty.buffer.ByteBuf;
+import io.netty.buffer.DrillBuf;
+
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
+
 import parquet.bytes.BytesUtils;
 import parquet.column.ColumnDescriptor;
 import parquet.format.Encoding;
@@ -49,7 +52,7 @@ public abstract class VarLengthValuesColumn<V extends 
ValueVector> extends VarLe
     }
   }
 
-  public abstract boolean setSafe(int index, ByteBuf bytes, int start, int 
length);
+  public abstract boolean setSafe(int index, DrillBuf bytes, int start, int 
length);
 
   @Override
   protected void readField(long recordToRead) {
@@ -80,8 +83,7 @@ public abstract class VarLengthValuesColumn<V extends 
ValueVector> extends VarLe
   protected boolean readAndStoreValueSizeInformation() throws IOException {
     // re-purposing this field here for length in BYTES to prevent repetitive 
multiplication/division
     try {
-    dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(),
-        (int) pageReader.readyToReadPosInBytes);
+      dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) 
pageReader.readyToReadPosInBytes);
     } catch (Throwable t) {
       throw t;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
index f9bac65..c6310b1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -17,12 +17,11 @@
  */
 package org.apache.drill.exec.store.parquet2;
 
-import com.google.common.collect.Lists;
+import io.netty.buffer.DrillBuf;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import java.math.BigDecimal;
+import java.util.List;
 
-import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.expr.holders.BigIntHolder;
 import org.apache.drill.exec.expr.holders.BitHolder;
 import org.apache.drill.exec.expr.holders.DateHolder;
@@ -37,8 +36,9 @@ import org.apache.drill.exec.expr.holders.TimeHolder;
 import org.apache.drill.exec.expr.holders.TimeStampHolder;
 import org.apache.drill.exec.expr.holders.VarBinaryHolder;
 import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.ParquetOutputRecordWriter;
-import 
org.apache.drill.exec.store.parquet.columnreaders.NullableFixedByteAlignedReaders;
+import org.apache.drill.exec.util.DecimalUtility;
 import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
 import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
@@ -68,30 +68,31 @@ import parquet.schema.PrimitiveType;
 import parquet.schema.Type;
 import parquet.schema.Type.Repetition;
 
-import java.math.BigDecimal;
-import java.util.List;
+import com.google.common.collect.Lists;
 
 public class DrillParquetGroupConverter extends GroupConverter {
 
   private List<Converter> converters;
   private MapWriter mapWriter;
+  private final OutputMutator mutator;
 
-  public DrillParquetGroupConverter(ComplexWriterImpl complexWriter, 
MessageType schema) {
-    this(complexWriter.rootAsMap(), schema);
+  public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl 
complexWriter, MessageType schema) {
+    this(mutator, complexWriter.rootAsMap(), schema);
   }
 
-  public DrillParquetGroupConverter(MapWriter mapWriter, GroupType schema) {
+  public DrillParquetGroupConverter(OutputMutator mutator, MapWriter 
mapWriter, GroupType schema) {
     this.mapWriter = mapWriter;
+    this.mutator = mutator;
     converters = Lists.newArrayList();
     for (Type type : schema.getFields()) {
       Repetition rep = type.getRepetition();
       boolean isPrimitive = type.isPrimitive();
       if (!isPrimitive) {
         if (rep != Repetition.REPEATED) {
-          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mapWriter.map(type.getName()), type.asGroupType());
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), 
type.asGroupType());
           converters.add(converter);
         } else {
-          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mapWriter.list(type.getName()).map(), 
type.asGroupType());
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), 
type.asGroupType());
           converters.add(converter);
         }
       } else {
@@ -162,21 +163,21 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
       case BINARY: {
         if (type.getOriginalType() == null) {
           VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED 
? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
-          return new DrillVarBinaryConverter(writer);
+          return new DrillVarBinaryConverter(writer, 
mutator.getManagedBuffer());
         }
         switch(type.getOriginalType()) {
           case UTF8: {
             VarCharWriter writer = type.getRepetition() == Repetition.REPEATED 
? mapWriter.list(name).varChar() : mapWriter.varChar(name);
-            return new DrillVarCharConverter(writer);
+            return new DrillVarCharConverter(writer, 
mutator.getManagedBuffer());
           }
           case DECIMAL: {
             DecimalMetadata metadata = type.getDecimalMetadata();
             if (metadata.getPrecision() <= 28) {
               Decimal28SparseWriter writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : 
mapWriter.decimal28Sparse(name);
-              return new DrillBinaryToDecimal28Converter(writer, 
metadata.getPrecision(), metadata.getScale());
+              return new DrillBinaryToDecimal28Converter(writer, 
metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer());
             } else {
               Decimal38SparseWriter writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : 
mapWriter.decimal38Sparse(name);
-              return new DrillBinaryToDecimal38Converter(writer, 
metadata.getPrecision(), metadata.getScale());
+              return new DrillBinaryToDecimal38Converter(writer, 
metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer());
             }
           }
           default: {
@@ -363,16 +364,18 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
 
   public static class DrillVarBinaryConverter extends PrimitiveConverter {
     private VarBinaryWriter writer;
+    private DrillBuf buf;
     private VarBinaryHolder holder = new VarBinaryHolder();
 
-    public DrillVarBinaryConverter(VarBinaryWriter writer) {
+    public DrillVarBinaryConverter(VarBinaryWriter writer, DrillBuf buf) {
       this.writer = writer;
+      this.buf = buf;
     }
 
     @Override
     public void addBinary(Binary value) {
-      ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
-      holder.buffer = buf;
+      holder.buffer = buf = buf.reallocIfNeeded(value.length());
+      buf.setBytes(0, value.toByteBuffer());
       holder.start = 0;
       holder.end = value.length();
       writer.write(holder);
@@ -382,15 +385,17 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
   public static class DrillVarCharConverter extends PrimitiveConverter {
     private VarCharWriter writer;
     private VarCharHolder holder = new VarCharHolder();
+    private DrillBuf buf;
 
-    public DrillVarCharConverter(VarCharWriter writer) {
+    public DrillVarCharConverter(VarCharWriter writer,  DrillBuf buf) {
       this.writer = writer;
+      this.buf = buf;
     }
 
     @Override
     public void addBinary(Binary value) {
-      ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
-      holder.buffer = buf;
+      holder.buffer = buf = buf.reallocIfNeeded(value.length());
+      buf.setBytes(0, value.toByteBuffer());
       holder.start = 0;
       holder.end = value.length();
       writer.write(holder);
@@ -400,9 +405,11 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
   public static class DrillBinaryToDecimal28Converter extends 
PrimitiveConverter {
     private Decimal28SparseWriter writer;
     private Decimal28SparseHolder holder = new Decimal28SparseHolder();
+    private DrillBuf buf;
 
-    public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int 
precision, int scale) {
+    public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int 
precision, int scale,  DrillBuf buf) {
       this.writer = writer;
+      this.buf = buf.reallocIfNeeded(28);
       holder.precision = precision;
       holder.scale = scale;
     }
@@ -410,7 +417,6 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
     @Override
     public void addBinary(Binary value) {
       BigDecimal bigDecimal = 
DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), 
holder.scale);
-      ByteBuf buf = Unpooled.wrappedBuffer(new byte[28]);
       DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, 
holder.precision, Decimal28SparseHolder.nDecimalDigits);
       holder.buffer = buf;
       writer.write(holder);
@@ -420,9 +426,11 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
   public static class DrillBinaryToDecimal38Converter extends 
PrimitiveConverter {
     private Decimal38SparseWriter writer;
     private Decimal38SparseHolder holder = new Decimal38SparseHolder();
+    private DrillBuf buf;
 
-    public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int 
precision, int scale) {
+    public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int 
precision, int scale,  DrillBuf buf) {
       this.writer = writer;
+      this.buf = buf.reallocIfNeeded(38);
       holder.precision = precision;
       holder.scale = scale;
     }
@@ -430,7 +438,6 @@ public class DrillParquetGroupConverter extends 
GroupConverter {
     @Override
     public void addBinary(Binary value) {
       BigDecimal bigDecimal = 
DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), 
holder.scale);
-      ByteBuf buf = Unpooled.wrappedBuffer(new byte[38]);
       DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, 
holder.precision, Decimal38SparseHolder.nDecimalDigits);
       holder.buffer = buf;
       writer.write(holder);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index f47acab..16f520c 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -156,7 +156,7 @@ public class DrillParquetReader implements RecordReader {
       }
 
       writer = new VectorContainerWriter(output);
-      recordMaterializer = new DrillParquetRecordMaterializer(writer, 
projection);
+      recordMaterializer = new DrillParquetRecordMaterializer(output, writer, 
projection);
       primitiveVectors = writer.getMapVector().getPrimitiveVectors();
       recordReader = columnIO.getRecordReader(pageReadStore, 
recordMaterializer);
     } catch (Exception e) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
index 69893dc..f9c3480 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -17,24 +17,21 @@
  */
 package org.apache.drill.exec.store.parquet2;
 
-import org.apache.drill.exec.vector.ValueVector;
-import org.apache.drill.exec.vector.complex.MapVector;
-import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+
 import parquet.io.api.GroupConverter;
 import parquet.io.api.RecordMaterializer;
 import parquet.schema.MessageType;
 
-import java.util.List;
-
 public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
 
   public DrillParquetGroupConverter root;
   private ComplexWriter complexWriter;
 
-  public DrillParquetRecordMaterializer(ComplexWriter complexWriter, 
MessageType schema) {
+  public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter 
complexWriter, MessageType schema) {
     this.complexWriter = complexWriter;
-    root = new DrillParquetGroupConverter(complexWriter.rootAsMap(), schema);
+    root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), 
schema);
   }
 
   public void setPosition(int position) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 9a5920d..48e09aa 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -18,8 +18,10 @@
 package org.apache.drill.exec.store.pojo;
 
 import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
 import java.sql.Timestamp;
 import java.util.Iterator;
+import java.util.List;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
@@ -35,8 +37,12 @@ import 
org.apache.drill.exec.store.pojo.Writers.NBigIntWriter;
 import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter;
 import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter;
 import org.apache.drill.exec.store.pojo.Writers.NIntWriter;
-import org.apache.drill.exec.store.pojo.Writers.StringWriter;
 import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter;
+import org.apache.drill.exec.store.pojo.Writers.StringWriter;
+
+import com.google.common.collect.Lists;
+
+
 
 public class PojoRecordReader<T> implements RecordReader{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class);
@@ -67,42 +73,51 @@ public class PojoRecordReader<T> implements RecordReader{
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try{
       Field[] fields = pojoClass.getDeclaredFields();
-      writers = new PojoWriter[fields.length];
-      for(int i = 0; i < writers.length; i++){
+      List<PojoWriter> writers = Lists.newArrayList();
+
+      for(int i = 0; i < fields.length; i++){
         Field f = fields[i];
-        Class<?> type = f.getType();
 
+        if(Modifier.isStatic(f.getModifiers())) continue;
+
+        Class<?> type = f.getType();
+        PojoWriter w = null;
         if(type == int.class){
-          writers[i] = new IntWriter(f);
+          w = new IntWriter(f);
         }else if(type == Integer.class){
-          writers[i] = new NIntWriter(f);
+          w = new NIntWriter(f);
         }else if(type == Long.class){
-          writers[i] = new NBigIntWriter(f);
+          w = new NBigIntWriter(f);
         }else if(type == Boolean.class){
-          writers[i] = new NBooleanWriter(f);
+          w = new NBooleanWriter(f);
         }else if(type == double.class){
-          writers[i] = new DoubleWriter(f);
+          w = new DoubleWriter(f);
         }else if(type == Double.class){
-          writers[i] = new NDoubleWriter(f);
+          w = new NDoubleWriter(f);
         }else if(type.isEnum()){
-          writers[i] = new EnumWriter(f);
+          w = new EnumWriter(f, output.getManagedBuffer());
         }else if(type == boolean.class){
-          writers[i] = new BitWriter(f);
+          w = new BitWriter(f);
         }else if(type == long.class){
-          writers[i] = new LongWriter(f);
+          w = new LongWriter(f);
         }else if(type == String.class){
-          writers[i] = new StringWriter(f);
+          w = new StringWriter(f, output.getManagedBuffer());
         }else if (type == Timestamp.class) {
-          writers[i] = new NTimeStampWriter(f);
+          w = new NTimeStampWriter(f);
         }else{
           throw new ExecutionSetupException(String.format("PojoRecord reader 
doesn't yet support conversions from type [%s].", type));
         }
-        writers[i].init(output);
+        writers.add(w);
+        w.init(output);
       }
+
+      this.writers = writers.toArray(new PojoWriter[writers.size()]);
+
     }catch(SchemaChangeException e){
       throw new ExecutionSetupException("Failure while setting up schema for 
PojoRecordReader.", e);
     }
 
+
   }
 
   private void allocate(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
index 03732a0..8227695 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java
@@ -17,8 +17,7 @@
  */
 package org.apache.drill.exec.store.pojo;
 
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.UnpooledByteBufAllocator;
+import io.netty.buffer.DrillBuf;
 
 import java.lang.reflect.Field;
 import java.sql.Timestamp;
@@ -34,8 +33,8 @@ import org.apache.drill.exec.vector.NullableBigIntVector;
 import org.apache.drill.exec.vector.NullableBitVector;
 import org.apache.drill.exec.vector.NullableFloat8Vector;
 import org.apache.drill.exec.vector.NullableIntVector;
-import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.NullableTimeStampVector;
+import org.apache.drill.exec.vector.NullableVarCharVector;
 
 import com.google.common.base.Charsets;
 
@@ -105,23 +104,20 @@ public class Writers {
   }
 
   private abstract static class AbstractStringWriter extends 
AbstractWriter<NullableVarCharVector>{
-    private ByteBuf data;
+    private DrillBuf data;
     private final NullableVarCharHolder h = new NullableVarCharHolder();
 
-    public AbstractStringWriter(Field field) {
+    public AbstractStringWriter(Field field, DrillBuf managedBuf) {
       super(field, Types.optional(MinorType.VARCHAR));
+      this.data = managedBuf;
       ensureLength(100);
     }
 
     void ensureLength(int len){
-      if(data == null || data.capacity() < len){
-        if(data != null) data.release();
-        data = UnpooledByteBufAllocator.DEFAULT.buffer(len);
-      }
+      data = data.reallocIfNeeded(len);
     }
 
     public void cleanup(){
-      data.release();
     }
 
     public boolean writeString(String s, int outboundIndex) throws 
IllegalArgumentException, IllegalAccessException {
@@ -145,8 +141,8 @@ public class Writers {
   }
 
   public static class EnumWriter extends AbstractStringWriter{
-    public EnumWriter(Field field) {
-      super(field);
+    public EnumWriter(Field field, DrillBuf managedBuf) {
+      super(field, managedBuf);
       if(!field.getType().isEnum()) throw new IllegalStateException();
     }
 
@@ -159,8 +155,8 @@ public class Writers {
   }
 
   public static class StringWriter extends AbstractStringWriter {
-    public StringWriter(Field field) {
-      super(field);
+    public StringWriter(Field field, DrillBuf managedBuf) {
+      super(field, managedBuf);
       if(field.getType() != String.class) throw new IllegalStateException();
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java
index e7bdbcf..37e6040 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java
@@ -17,42 +17,19 @@
  */
 package org.apache.drill.exec.util;
 
-import static java.nio.ByteOrder.LITTLE_ENDIAN;
-import io.netty.buffer.ByteBuf;
-import io.netty.buffer.Unpooled;
+import io.netty.buffer.DrillBuf;
 
 import java.io.DataInput;
 
 import org.apache.drill.common.util.DrillStringUtils;
 
 public class ByteBufUtil {
-  /**
-   * Creates a wrapped {@link ByteBuf} of {@code size} number of bytes with
-   * the Drill's default endianness(LITTLE_ENDIAN).
-   *
-   * @param size
-   * @return
-   */
-  public static ByteBuf createBuffer(int size) {
-    return Unpooled.wrappedBuffer(new byte[size]).order(LITTLE_ENDIAN);
-  }
-
-  /**
-   * Creates a {@link ByteBuf} which wraps the provided byte array with
-   * the Drill's default endianness(LITTLE_ENDIAN).
-   *
-   * @param size
-   * @return
-   */
-  public static ByteBuf createBuffer(byte[] source) {
-    return Unpooled.wrappedBuffer(source).order(LITTLE_ENDIAN);
-  }
 
   /**
    * Verifies that the the space provided in the buffer is of specified size.
    * @throws IllegalArgumentException if the specified boundaries do not 
describe the expected size.
    */
-  public static void checkBufferLength(ByteBuf buffer, int start, int end, int 
requiredLen) {
+  public static void checkBufferLength(DrillBuf buffer, int start, int end, 
int requiredLen) {
     int actualLen = (end - start);
     if (actualLen != requiredLen) {
       throw new IllegalArgumentException(String.format("Wrong length %d(%d-%d) 
in the buffer '%s', expected %d.",
@@ -62,7 +39,7 @@ public class ByteBufUtil {
 
   /**
    * Modeled after {@code org.apache.hadoop.io.WritableUtils}.
-   * We copy the code to avoid wrapping {@link ByteBuf} to/from {@link 
DataInput}.
+   * We copy the code to avoid wrapping {@link DrillBuf} to/from {@link 
DataInput}.
    */
   public static class HadoopWritables {
     /**
@@ -76,10 +53,10 @@ public class ByteBufUtil {
      * is negative, with number of bytes that follow are -(v+124). Bytes are
      * stored in the high-non-zero-byte-first order.
      *
-     * @param buffer ByteBuf to read from
+     * @param buffer DrillBuf to read from
      * @param i Integer to be serialized
      */
-    public static void writeVInt(ByteBuf buffer,  int start, int end, int i) {
+    public static void writeVInt(DrillBuf buffer,  int start, int end, int i) {
       writeVLong(buffer, start, end, i);
     }
 
@@ -94,10 +71,10 @@ public class ByteBufUtil {
      * is negative, with number of bytes that follow are -(v+120). Bytes are
      * stored in the high-non-zero-byte-first order.
      *
-     * @param buffer ByteBuf to write to
+     * @param buffer DrillBuf to write to
      * @param i Long to be serialized
      */
-    public static void writeVLong(ByteBuf buffer, int start, int end, long i) {
+    public static void writeVLong(DrillBuf buffer, int start, int end, long i) 
{
       int availableBytes = (end-start);
       if (availableBytes < getVIntSize(i)) {
         throw new NumberFormatException("Expected " + getVIntSize(i) + " bytes 
but the buffer '"
@@ -136,10 +113,10 @@ public class ByteBufUtil {
 
     /**
      * Reads a zero-compressed encoded integer from input stream and returns 
it.
-     * @param buffer ByteBuf to read from
+     * @param buffer DrillBuf to read from
      * @return deserialized integer from stream.
      */
-    public static int readVInt(ByteBuf buffer, int start, int end) {
+    public static int readVInt(DrillBuf buffer, int start, int end) {
       long n = readVLong(buffer, start, end);
       if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) {
         throw new NumberFormatException("Value " + n + " too long to fit in 
integer");
@@ -149,10 +126,10 @@ public class ByteBufUtil {
 
     /**
      * Reads a zero-compressed encoded long from input stream and returns it.
-     * @param buffer ByteBuf to read from
+     * @param buffer DrillBuf to read from
      * @return deserialized long from stream.
      */
-    public static long readVLong(ByteBuf buffer, int start, int end) {
+    public static long readVLong(DrillBuf buffer, int start, int end) {
       buffer.readerIndex(start);
       byte firstByte = buffer.readByte();
       int len = decodeVIntSize(firstByte);

Reply via email to