DRILL-1281: Read into Direct Memory in Parquet Reader. Requires Hadoop 2.4 or 
above


Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/02e4824e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/02e4824e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/02e4824e

Branch: refs/heads/master
Commit: 02e4824ed9da6e15b5f974f16b15b408695c8ada
Parents: 654c879
Author: Parth Chandra <[email protected]>
Authored: Mon Jul 28 13:44:52 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Mon Aug 18 14:46:26 2014 -0700

----------------------------------------------------------------------
 .../drill/common/util/DecimalUtility.java       |   9 +-
 contrib/storage-hbase/pom.xml                   |  48 +++++
 .../exec/store/hbase/HBaseRecordReader.java     |  14 ++
 .../drill/exec/store/hive/HiveRecordReader.java |  13 ++
 exec/java-exec/pom.xml                          | 112 ++++++++++--
 .../codegen/templates/HolderReaderImpl.java     |   1 -
 .../drill/exec/physical/impl/ScanBatch.java     |   6 +-
 .../exec/store/LocalSyncableFileSystem.java     |  12 ++
 .../apache/drill/exec/store/RecordReader.java   |   9 +
 .../exec/store/easy/json/JSONRecordReader2.java |  14 ++
 .../drill/exec/store/mock/MockRecordReader.java |  13 ++
 .../exec/store/parquet/ColumnDataReader.java    |  26 ++-
 .../store/parquet/ParquetScanBatchCreator.java  |   8 +-
 .../store/parquet/columnreaders/BitReader.java  |  11 +-
 .../parquet/columnreaders/ColumnReader.java     |   8 +-
 .../columnreaders/FixedByteAlignedReader.java   |  15 +-
 .../NullableFixedByteAlignedReaders.java        |  33 ++--
 .../NullableVarLengthValuesColumn.java          |   5 +-
 .../store/parquet/columnreaders/PageReader.java | 127 ++++++++++---
 .../columnreaders/ParquetRecordReader.java      |  13 ++
 .../columnreaders/VarLengthColumnReaders.java   |  98 +++++++---
 .../columnreaders/VarLengthValuesColumn.java    |   5 +-
 .../exec/store/parquet2/DrillParquetReader.java |   9 +
 .../drill/exec/store/pojo/PojoRecordReader.java |  10 ++
 .../exec/store/text/DrillTextRecordReader.java  |  14 +-
 .../parquet/hadoop/CodecFactoryExposer.java     |  96 +++++++++-
 .../physical/impl/writer/TestParquetWriter.java |   4 +-
 exec/jdbc/pom.xml                               |  14 +-
 pom.xml                                         | 180 +++++++++++--------
 29 files changed, 747 insertions(+), 180 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java
----------------------------------------------------------------------
diff --git 
a/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java 
b/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java
index 8311d82..85ba918 100644
--- a/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java
+++ b/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java
@@ -553,13 +553,20 @@ public class DecimalUtility {
       return 0;
     }
 
-
     public static BigDecimal getBigDecimalFromByteArray(byte[] bytes, int 
start, int length, int scale) {
       byte[] value = Arrays.copyOfRange(bytes, start, start + length);
       BigInteger unscaledValue = new BigInteger(value);
       return new BigDecimal(unscaledValue, scale);
     }
 
+
+    public static BigDecimal getBigDecimalFromByteBuf(ByteBuf bytebuf, int 
start, int length, int scale) {
+      byte[] value = new byte[length];
+      bytebuf.getBytes(start, value, 0, length);
+      BigInteger unscaledValue = new BigInteger(value);
+      return new BigDecimal(unscaledValue, scale);
+    }
+
   public static void roundDecimal(ByteBuf result, int start, int 
nDecimalDigits, int desiredScale, int currentScale) {
     int newScaleRoundedUp  = 
org.apache.drill.common.util.DecimalUtility.roundUp(desiredScale);
     int origScaleRoundedUp = 
org.apache.drill.common.util.DecimalUtility.roundUp(currentScale);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/pom.xml b/contrib/storage-hbase/pom.xml
index c6043a3..82bf4bb 100644
--- a/contrib/storage-hbase/pom.xml
+++ b/contrib/storage-hbase/pom.xml
@@ -37,6 +37,12 @@
       <groupId>org.apache.drill.exec</groupId>
       <artifactId>drill-java-exec</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+          <artifactId>hadoop-client</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
 
     <!-- Test dependencies -->
@@ -46,6 +52,17 @@
       <classifier>tests</classifier>
       <version>${project.version}</version>
       <scope>test</scope>
+      <!-- here we explicitly exclude hadoop v2.x dependencies
+           because HBase 0.94.x unit tests do not work with hadoop 2
+           test classes, e.g. MiniDFSCluster.
+           This will change once we move to HBase 0.98 or later.
+        -->
+      <exclusions>
+        <exclusion>
+          <artifactId>hadoop-client</artifactId>
+          <groupId>org.apache.hadoop</groupId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.drill</groupId>
@@ -146,10 +163,37 @@
           </exclusions>
         </dependency>
         <dependency>
+          <!-- contrary to other Drill modules, this HBase storage module
+               compiles against hadoop 1.x to allow running a mini HBase
+               cluster to run the unit tests.
+               This will change once we move to HBase 0.98 or later.
+            -->
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-core</artifactId>
+          <version>1.2.1</version>
+          <exclusions>
+            <exclusion>
+              <artifactId>commons-logging</artifactId>
+              <groupId>commons-logging</groupId>
+            </exclusion>
+            <exclusion>
+              <artifactId>asm</artifactId>
+              <groupId>asm</groupId>
+            </exclusion>
+          </exclusions>
+        </dependency>
+        <!-- test dependencies -->
+        <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-test</artifactId>
           <version>1.2.1</version>
           <scope>test</scope>
+          <exclusions>
+            <exclusion>
+              <artifactId>commons-logging</artifactId>
+              <groupId>commons-logging</groupId>
+            </exclusion>
+          </exclusions>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
@@ -230,6 +274,10 @@
               <artifactId>jruby-complete</artifactId>
               <groupId>org.jruby</groupId>
             </exclusion>
+            <exclusion>
+              <artifactId>hadoop-core</artifactId>
+              <groupId>org.apache</groupId>
+            </exclusion>
           </exclusions>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
index 204d486..954cc5a 100644
--- 
a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
+++ 
b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java
@@ -33,6 +33,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
@@ -70,12 +71,16 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
   private Scan hbaseScan;
   private Configuration hbaseConf;
   private Result leftOver;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+
 
   public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec 
subScanSpec,
       List<SchemaPath> projectedColumns, FragmentContext context) throws 
OutOfMemoryException {
     hbaseConf = conf;
     hbaseTable = subScanSpec.getTableName();
     hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow());
+    fragmentContext=context;
     boolean rowKeyOnly = true;
     this.columns = Sets.newLinkedHashSet();
     if (projectedColumns != null && projectedColumns.size() != 0) {
@@ -117,6 +122,15 @@ public class HBaseRecordReader implements RecordReader, 
DrillHBaseConstants {
     hbaseScan.setCaching(TARGET_RECORD_COUNT);
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     this.outputMutator = output;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
----------------------------------------------------------------------
diff --git 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
index ac0fe36..36e55d9 100644
--- 
a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
+++ 
b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java
@@ -34,6 +34,7 @@ import org.apache.drill.common.types.Types;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
@@ -101,6 +102,9 @@ public class HiveRecordReader implements RecordReader {
   protected Object redoRecord;
   protected boolean empty;
   private Map<String, String> hiveConfigOverride;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+
 
   protected static final int TARGET_RECORD_COUNT = 4000;
   protected static final int FIELD_SIZE = 50;
@@ -114,6 +118,7 @@ public class HiveRecordReader implements RecordReader {
     this.projectedColumns = projectedColumns;
     this.empty = (inputSplit == null && partition == null);
     this.hiveConfigOverride = hiveConfigOverride;
+    this.fragmentContext=context;
     init();
   }
 
@@ -229,6 +234,14 @@ public class HiveRecordReader implements RecordReader {
     }
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 5465f12..21dfc67 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -49,6 +49,11 @@
       <version>2.1</version>
     </dependency>
     <dependency>
+        <groupId>org.apache.commons</groupId>
+        <artifactId>commons-math</artifactId>
+        <version>2.2</version>
+    </dependency>
+    <dependency>
       <groupId>com.thoughtworks.paranamer</groupId>
       <artifactId>paranamer</artifactId>
       <version>2.5.6</version>
@@ -120,31 +125,106 @@
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-column</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+      <version>1.5.1-drill-r2</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
+          <artifactId>hadoop-client</artifactId>
         </exclusion>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-client</artifactId>
+          <artifactId>hadoop-common</artifactId>
         </exclusion>
       </exclusions>
     </dependency>
     <dependency>
       <groupId>com.twitter</groupId>
       <artifactId>parquet-hadoop</artifactId>
-      <version>1.5.0-SNAPSHOT</version>
+      <version>1.5.1-drill-r2</version>
       <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
         </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-format</artifactId>
+      <version>2.1.1-SNAPSHOT</version>
+      <exclusions>
         <exclusion>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
         </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-common</artifactId>
+      <version>1.5.1-drill-r2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-jackson</artifactId>
+      <version>1.5.1-drill-r2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-encoding</artifactId>
+      <version>1.5.1-drill-r2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
+      </exclusions>
+    </dependency>
+    <dependency>
+      <groupId>com.twitter</groupId>
+      <artifactId>parquet-generator</artifactId>
+      <version>1.5.1-drill-r2</version>
+      <exclusions>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-client</artifactId>
+        </exclusion>
+        <exclusion>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
+        </exclusion>
       </exclusions>
     </dependency>
     <dependency>
@@ -267,7 +347,7 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
+          <artifactId>hadoop-client</artifactId>
           <exclusions>
             <exclusion>
               <groupId>com.sun.jersey</groupId>
@@ -279,7 +359,6 @@
             </exclusion>
           </exclusions>
         </dependency>
-
       </dependencies>
     </profile>
     <profile>
@@ -287,17 +366,28 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
+          <artifactId>hadoop-client</artifactId>
         </dependency>
         <dependency>
           <groupId>com.mapr.hadoop</groupId>
-          <artifactId>maprfs</artifactId>
-          <version>1.0.3-mapr-3.0.0</version>
+            <artifactId>maprfs</artifactId>
+            <version>2.4.1-mapr-4.0.0-SNAPSHOT</version>
+            <exclusions>
+             <exclusion>
+               <artifactId>commons-logging</artifactId>
+               <groupId>commons-logging</groupId>
+             </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+          <groupId>org.json</groupId>
+          <artifactId>json</artifactId>
+          <version>20090211</version>
         </dependency>
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase</artifactId>
-          <version>0.94.13-mapr-1401-m7-3.0.2</version>
+          <version>0.94.17-mapr-1405-4.0.0-FCS</version>
           <exclusions>
             <exclusion>
               <groupId>org.slf4j</groupId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java 
b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
index 5f718a0..bd862a1 100644
--- a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
+++ b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java
@@ -44,7 +44,6 @@ import java.math.BigInteger;
 import org.apache.drill.exec.expr.holders.*;
 import org.apache.hadoop.io.Text;
 import org.joda.time.Period;
-import org.mortbay.jetty.servlet.Holder;
 
 @SuppressWarnings("unused")
 public class ${holderMode}${name}HolderReaderImpl extends AbstractFieldReader {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index eaf26d1..4b7e33e 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -95,6 +95,7 @@ public class ScanBatch implements RecordBatch {
       throw new ExecutionSetupException("A scan batch must contain at least 
one reader.");
     this.currentReader = readers.next();
     this.oContext = new OperatorContext(subScanConfig, context);
+    this.currentReader.setOperatorContext(this.oContext);
     this.currentReader.setup(mutator);
     this.partitionColumns = partitionColumns.iterator();
     this.partitionValues = this.partitionColumns.hasNext() ? 
this.partitionColumns.next() : null;
@@ -108,11 +109,14 @@ public class ScanBatch implements RecordBatch {
     this(subScanConfig, context, readers, Collections.<String[]> emptyList(), 
Collections.<Integer> emptyList());
   }
 
-  @Override
   public FragmentContext getContext() {
     return context;
   }
 
+  public OperatorContext getOperatorContext() {
+    return oContext;
+  }
+
   @Override
   public BatchSchema getSchema() {
     return schema;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
index 1e55c73..37b9c9d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java
@@ -144,6 +144,18 @@ public class LocalSyncableFileSystem extends FileSystem {
     }
 
     @Override
+    public void hsync() throws IOException {
+      output.flush();
+      fos.getFD().sync();
+    }
+
+    @Override
+    public void hflush() throws IOException {
+      output.flush();
+      fos.getFD().sync();
+    }
+
+    @Override
     public void write(int b) throws IOException {
       output.write(b);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
index fb8a014..1745421 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java
@@ -18,6 +18,7 @@
 package org.apache.drill.exec.store;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 
 public interface RecordReader {
@@ -36,6 +37,14 @@ public interface RecordReader {
   public abstract void setup(OutputMutator output) throws 
ExecutionSetupException;
 
   /**
+   * Set the operator context. The Reader can use this to access the operator 
context and allocate direct memory
+   * if needed
+   * @param operatorContext
+   */
+  public abstract void setOperatorContext(OperatorContext operatorContext);
+
+
+  /**
    * Increment record reader forward, writing into the provided output batch.  
    * 
    * @return The number of additional records added to the output.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
index 3e2c81c..cd3cfdc 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java
@@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.vector.complex.fn.JsonReader;
@@ -50,11 +51,15 @@ public class JSONRecordReader2 implements RecordReader{
   private InputStream stream;
   private JsonReaderWithState jsonReader;
   private int recordCount;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+
 
   public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, 
FileSystem fileSystem,
                           List<SchemaPath> columns) throws 
OutOfMemoryException {
     this.hadoopPath = new Path(inputPath);
     this.fileSystem = fileSystem;
+    this.fragmentContext=fragmentContext;
   }
 
   @Override
@@ -80,6 +85,15 @@ public class JSONRecordReader2 implements RecordReader{
     throw new DrillRuntimeException(sb.toString(), e);
   }
 
+
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   @Override
   public int next() {
     writer.allocate();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
index c7fc939..81505ae 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.drill.exec.memory.OutOfMemoryException;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
@@ -47,10 +48,14 @@ public class MockRecordReader implements RecordReader {
   private ValueVector[] valueVectors;
   private int recordsRead;
   private int batchRecordCount;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
+
 
   public MockRecordReader(FragmentContext context, MockScanEntry config) 
throws OutOfMemoryException {
     this.context = context;
     this.config = config;
+    this.fragmentContext=context;
   }
 
   private int getEstimatedRecordSize(MockColumn[] types) {
@@ -69,6 +74,14 @@ public class MockRecordReader implements RecordReader {
 
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
index 80fbd80..2567c89 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java
@@ -17,9 +17,16 @@
  */
 package org.apache.drill.exec.store.parquet;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -27,6 +34,7 @@ import org.apache.hadoop.fs.Path;
 import parquet.bytes.BytesInput;
 import parquet.format.PageHeader;
 import parquet.format.Util;
+import parquet.hadoop.util.CompatibilityUtil;
 
 public class ColumnDataReader {
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class);
@@ -43,13 +51,26 @@ public class ColumnDataReader {
   public PageHeader readPageHeader() throws IOException{
     return Util.readPageHeader(input);
   }
-  
+
   public BytesInput getPageAsBytesInput(int pageLength) throws IOException{
     byte[] b = new byte[pageLength];
     input.read(b);
     return new HadoopBytesInput(b);
   }
-  
+
+  public ByteBuf getPageAsBytesBuf(ByteBuf byteBuf, int pageLength) throws 
IOException{
+    ByteBuffer directBuffer=byteBuf.nioBuffer(0, pageLength);
+    int l=directBuffer.remaining();
+    int bl=byteBuf.capacity();
+    try{
+      CompatibilityUtil.getBuf(input, directBuffer, pageLength);
+    }catch(Exception e) {
+      logger.error("Failed to read data into Direct ByteBuffer with exception: 
"+e.getMessage());
+      throw new DrillRuntimeException(e.getMessage());
+    }
+    return byteBuf;
+  }
+
   public void clear(){
     try{
       input.close();
@@ -87,4 +108,5 @@ public class ColumnDataReader {
     }
     
   }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index 5edd3c5..a453e66 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -143,7 +143,13 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
       }
     }
 
-    return new ScanBatch(rowGroupScan, context, readers.iterator(), 
partitionColumns, selectedPartitionColumns);
+    ScanBatch s = new ScanBatch(rowGroupScan, context, readers.iterator(), 
partitionColumns, selectedPartitionColumns);
+
+    for(RecordReader r  : readers){
+      r.setOperatorContext(s.getOperatorContext());
+    }
+
+    return s;
   }
 
   private static boolean isComplex(ParquetMetadata footer) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
index 2c6e488..43e270a 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.BaseDataValueVector;
 import org.apache.drill.exec.vector.ValueVector;
@@ -28,7 +29,7 @@ final class BitReader extends ColumnReader {
 
   private byte currentByte;
   private byte nextByte;
-  private byte[] bytes;
+  private ByteBuf bytebuf;
   
   BitReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
             boolean fixedLength, ValueVector v, SchemaElement schemaElement) 
throws ExecutionSetupException {
@@ -45,15 +46,15 @@ final class BitReader extends ColumnReader {
     readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
     readLength = (int) Math.ceil(readLengthInBits / 8.0);
 
-    bytes = pageReader.pageDataByteArray;
+    bytebuf = pageReader.pageDataByteArray;
     // standard read, using memory mapping
     if (pageReader.bitShift == 0) {
-      ((BaseDataValueVector) valueVec).getData().writeBytes(bytes,
+      ((BaseDataValueVector) valueVec).getData().writeBytes(bytebuf,
           (int) readStartInBytes, (int) readLength);
     } else { // read in individual values, because a bitshift is necessary 
with where the last page or batch ended
 
       vectorData = ((BaseDataValueVector) valueVec).getData();
-      nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 
8.0) - 1)];
+      nextByte = bytebuf.getByte((int) Math.max(0, 
Math.ceil(pageReader.valuesRead / 8.0) - 1));
       readLengthInBits = recordsReadInThisIteration + pageReader.bitShift;
 
       int i = 0;
@@ -66,7 +67,7 @@ final class BitReader extends ColumnReader {
         // if we are not on the last byte
         if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < 
pageReader.byteLength) {
           // grab the next byte from the buffer, shift and mask it, and OR it 
with the leftover bits
-          nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i];
+          nextByte = bytebuf.getByte((int) Math.ceil(pageReader.valuesRead / 
8.0) + i);
           currentByte = (byte) (currentByte | nextByte
               << (8 - pageReader.bitShift)
               & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
index fd672d6..b240407 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java
@@ -100,10 +100,12 @@ public abstract class ColumnReader<V extends ValueVector> 
{
 
   public void processPages(long recordsToReadInThisPass) throws IOException {
     reset();
-    do {
-      determineSize(recordsToReadInThisPass, 0);
+    if(recordsToReadInThisPass>0) {
+      do {
+        determineSize(recordsToReadInThisPass, 0);
 
-    } while (valuesReadInCurrentPass < recordsToReadInThisPass && 
pageReader.currentPage != null);
+      } while (valuesReadInCurrentPass < recordsToReadInThisPass && 
pageReader.currentPage != null);
+    }
     valueVec.getMutator().setValueCount(valuesReadInCurrentPass);
   }
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
index 4513aaa..5c25e11 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
@@ -35,7 +36,7 @@ import java.math.BigDecimal;
 
 class FixedByteAlignedReader extends ColumnReader {
 
-  protected byte[] bytes;
+  protected ByteBuf bytebuf;
 
   
   FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, 
ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData,
@@ -54,13 +55,13 @@ class FixedByteAlignedReader extends ColumnReader {
     readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits;
     readLength = (int) Math.ceil(readLengthInBits / 8.0);
 
-    bytes = pageReader.pageDataByteArray;
+    bytebuf = pageReader.pageDataByteArray;
     // vectorData is assigned by the superclass read loop method
     writeData();
   }
 
   protected void writeData() {
-    vectorData.writeBytes(bytes,
+    vectorData.writeBytes(bytebuf,
         (int) readStartInBytes, (int) readLength);
   }
 
@@ -81,7 +82,7 @@ class FixedByteAlignedReader extends ColumnReader {
     }
 
     /**
-     * Reads from bytes, converts, and writes to buffer
+     * Reads from bytebuf, converts, and writes to buffer
      * @param start the index in bytes to start reading from
      * @param index the index of the ValueVector
      */
@@ -101,7 +102,7 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(
-          
NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, 
start)
+          
NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, 
start)
               - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
     }
   }
@@ -119,7 +120,7 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), 
Decimal28SparseHolder.nDecimalDigits);
     }
@@ -138,7 +139,7 @@ class FixedByteAlignedReader extends ColumnReader {
     @Override
     void addNext(int start, int index) {
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal38Vector.getData(), index * width, schemaElement.getScale(),
               schemaElement.getPrecision(), 
Decimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
index c1575de..4d7f312 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java
@@ -17,6 +17,7 @@
  */
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DecimalUtility;
 import org.apache.drill.exec.vector.NullableBigIntVector;
@@ -41,7 +42,7 @@ import java.math.BigDecimal;
 public class NullableFixedByteAlignedReaders {
 
   static class NullableFixedByteAlignedReader extends NullableColumnReader {
-    protected byte[] bytes;
+    protected ByteBuf bytebuf;
 
     NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                       ColumnChunkMetaData columnChunkMetaData, boolean 
fixedLength, ValueVector v, SchemaElement schemaElement) throws 
ExecutionSetupException {
@@ -57,16 +58,16 @@ public class NullableFixedByteAlignedReaders {
       this.readStartInBytes = pageReader.readPosInBytes;
       this.readLengthInBits = recordsReadInThisIteration * 
dataTypeLengthInBits;
       this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-      this.bytes = pageReader.pageDataByteArray;
+      this.bytebuf = pageReader.pageDataByteArray;
 
       // fill in data.
-      vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength);
+      vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength);
     }
   }
 
   static class NullableDictionaryIntReader extends 
NullableColumnReader<NullableIntVector> {
 
-    private byte[] bytes;
+    private ByteBuf bytebuf;
 
     NullableDictionaryIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                 ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableIntVector v,
@@ -87,7 +88,7 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryBigIntReader extends 
NullableColumnReader<NullableBigIntVector> {
 
-    private byte[] bytes;
+    private ByteBuf bytebuf;
 
     NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableBigIntVector v,
@@ -106,7 +107,7 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat4Reader extends 
NullableColumnReader<NullableFloat4Vector> {
 
-    private byte[] bytes;
+    private ByteBuf bytebuf;
 
     NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                    ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat4Vector v,
@@ -125,7 +126,7 @@ public class NullableFixedByteAlignedReaders {
 
   static class NullableDictionaryFloat8Reader extends 
NullableColumnReader<NullableFloat8Vector> {
 
-    private byte[] bytes;
+    private ByteBuf bytebuf;
 
     NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int 
allocateSize, ColumnDescriptor descriptor,
                                   ColumnChunkMetaData columnChunkMetaData, 
boolean fixedLength, NullableFloat8Vector v,
@@ -160,7 +161,7 @@ public class NullableFixedByteAlignedReaders {
       this.readStartInBytes = pageReader.readPosInBytes;
       this.readLengthInBits = recordsReadInThisIteration * 
dataTypeLengthInBits;
       this.readLength = (int) Math.ceil(readLengthInBits / 8.0);
-      this.bytes = pageReader.pageDataByteArray;
+      this.bytebuf = pageReader.pageDataByteArray;
 
       dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0);
       for (int i = 0; i < recordsReadInThisIteration; i++) {
@@ -183,15 +184,15 @@ public class NullableFixedByteAlignedReaders {
 
     @Override
     void addNext(int start, int index) {
-      dateVector.getMutator().set(index, 
DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
+      dateVector.getMutator().set(index, 
DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start) - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5));
     }
 
     // copied out of parquet library, didn't want to deal with the uneeded 
throws statement they had declared
-    public static int readIntLittleEndian(byte[] in, int offset) {
-      int ch4 = in[offset] & 0xff;
-      int ch3 = in[offset + 1] & 0xff;
-      int ch2 = in[offset + 2] & 0xff;
-      int ch1 = in[offset + 3] & 0xff;
+    public static int readIntLittleEndian(ByteBuf in, int offset) {
+      int ch4 = in.getByte(offset) & 0xff;
+      int ch3 = in.getByte(offset + 1) & 0xff;
+      int ch2 = in.getByte(offset + 2) & 0xff;
+      int ch1 = in.getByte(offset + 3) & 0xff;
       return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
     }
 
@@ -210,7 +211,7 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal28Vector.getData(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), 
NullableDecimal28SparseHolder.nDecimalDigits);
     }
@@ -229,7 +230,7 @@ public class NullableFixedByteAlignedReaders {
     @Override
     void addNext(int start, int index) {
       int width = NullableDecimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, 
schemaElement.getScale());
       DecimalUtility.getSparseFromBigDecimal(intermediate, 
decimal38Vector.getData(), index * width, schemaElement.getScale(),
           schemaElement.getPrecision(), 
NullableDecimal38SparseHolder.nDecimalDigits);
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
index 2be9a37..83812ea 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java
@@ -17,6 +17,7 @@
  
******************************************************************************/
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 import parquet.bytes.BytesUtils;
@@ -37,7 +38,7 @@ public abstract class NullableVarLengthValuesColumn<V extends 
ValueVector> exten
     super(parentReader, allocateSize, descriptor, columnChunkMetaData, 
fixedLength, v, schemaElement);
   }
 
-  public abstract boolean setSafe(int index, byte[] value, int start, int 
length);
+  public abstract boolean setSafe(int index, ByteBuf value, int start, int 
length);
 
   public abstract int capacity();
 
@@ -81,7 +82,7 @@ public abstract class NullableVarLengthValuesColumn<V extends 
ValueVector> exten
     }
     else {
       // re-purposing  this field here for length in BYTES to prevent 
repetitive multiplication/division
-      dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+      dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(),
           (int) pageReader.readyToReadPosInBytes);
     }
     // I think this also needs to happen if it is null for the random access

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
index 1d300bb..1687c3b 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java
@@ -18,7 +18,12 @@
 package org.apache.drill.exec.store.parquet.columnreaders;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.store.parquet.ColumnDataReader;
 import org.apache.drill.exec.store.parquet.ParquetFormatPlugin;
@@ -37,6 +42,7 @@ import parquet.format.PageHeader;
 import parquet.format.PageType;
 import parquet.format.Util;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.schema.PrimitiveType;
 
 // class to keep track of the read position of variable length columns
@@ -48,7 +54,7 @@ final class PageReader {
   // store references to the pages that have been uncompressed, but not copied 
to ValueVectors yet
   Page currentPage;
   // buffer to store bytes of current page
-  byte[] pageDataByteArray;
+  ByteBuf pageDataByteArray;
 
   // for variable length data we need to keep track of our current position in 
the page data
   // as the values and lengths are intermixed, making random access to the 
length data impossible
@@ -81,8 +87,12 @@ final class PageReader {
   Dictionary dictionary;
   PageHeader pageHeader = null;
 
-  PageReader(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{
+  List<ByteBuf> allocatedBuffers;
+
+  PageReader(ColumnReader parentStatus, FileSystem fs, Path path, 
ColumnChunkMetaData columnChunkMetaData)
+    throws ExecutionSetupException{
     this.parentColumnReader = parentStatus;
+    allocatedBuffers = new ArrayList<ByteBuf>();
 
     long totalByteLength = columnChunkMetaData.getTotalUncompressedSize();
     long start = columnChunkMetaData.getFirstDataPageOffset();
@@ -93,11 +103,25 @@ final class PageReader {
         f.seek(columnChunkMetaData.getDictionaryPageOffset());
         PageHeader pageHeader = Util.readPageHeader(f);
         assert pageHeader.type == PageType.DICTIONARY_PAGE;
-        BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress( //
-                
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
-                pageHeader.getUncompressed_page_size(), //
-                parentColumnReader.columnChunkMetaData.getCodec());
+
+        BytesInput bytesIn;
+        ByteBuf 
uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
+        allocatedBuffers.add(uncompressedData);
+        
if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED)
 {
+          dataReader.getPageAsBytesBuf(uncompressedData, 
pageHeader.compressed_page_size);
+          
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
+            pageHeader.getUncompressed_page_size());
+        }else{
+          ByteBuf 
compressedData=allocateBuffer(pageHeader.compressed_page_size);
+          dataReader.getPageAsBytesBuf(compressedData, 
pageHeader.compressed_page_size);
+          bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+              compressedData,
+              uncompressedData,
+              pageHeader.compressed_page_size,
+              pageHeader.getUncompressed_page_size());
+          compressedData.release();
+        }
         DictionaryPage page = new DictionaryPage(
             bytesIn,
             pageHeader.uncompressed_page_size,
@@ -107,7 +131,8 @@ final class PageReader {
         this.dictionary = 
page.getEncoding().initDictionary(parentStatus.columnDescriptor, page);
       }
     } catch (IOException e) {
-      throw new ExecutionSetupException("Error opening or reading metatdata 
for parquet file at location: " + path.getName(), e);
+      throw new ExecutionSetupException("Error opening or reading metadata for 
parquet file at location: "
+        + path.getName(), e);
     }
     
   }
@@ -137,11 +162,26 @@ final class PageReader {
     do {
       pageHeader = dataReader.readPageHeader();
       if (pageHeader.getType() == PageType.DICTIONARY_PAGE) {
-        BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
-            .decompress( //
-                
dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), //
-                pageHeader.getUncompressed_page_size(), //
-                parentColumnReader.columnChunkMetaData.getCodec());
+
+        //TODO: Handle buffer allocation exception
+        BytesInput bytesIn;
+        ByteBuf 
uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
+        allocatedBuffers.add(uncompressedData);
+        if( parentColumnReader.columnChunkMetaData.getCodec()== 
CompressionCodecName.UNCOMPRESSED) {
+          dataReader.getPageAsBytesBuf(uncompressedData, 
pageHeader.compressed_page_size);
+          
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
+            pageHeader.getUncompressed_page_size());
+        }else{
+          ByteBuf 
compressedData=allocateBuffer(pageHeader.compressed_page_size);
+          dataReader.getPageAsBytesBuf(compressedData, 
pageHeader.compressed_page_size);
+          bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+            .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+              compressedData,
+              uncompressedData,
+              pageHeader.compressed_page_size,
+              pageHeader.getUncompressed_page_size());
+          compressedData.release();
+        }
         DictionaryPage page = new DictionaryPage(
             bytesIn,
             pageHeader.uncompressed_page_size,
@@ -152,11 +192,25 @@ final class PageReader {
       }
     } while (pageHeader.getType() == PageType.DICTIONARY_PAGE);
 
-    BytesInput bytesIn = 
parentColumnReader.parentReader.getCodecFactoryExposer()
-        .decompress( //
-            dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), 
// 
-            pageHeader.getUncompressed_page_size(), //
-            parentColumnReader.columnChunkMetaData.getCodec());
+    //TODO: Handle buffer allocation exception
+    BytesInput bytesIn;
+    ByteBuf 
uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size());
+    allocatedBuffers.add(uncompressedData);
+    
if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED)
 {
+      dataReader.getPageAsBytesBuf(uncompressedData, 
pageHeader.compressed_page_size);
+      
bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData,
+        pageHeader.getUncompressed_page_size());
+    }else{
+      ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size);
+      dataReader.getPageAsBytesBuf(compressedData, 
pageHeader.compressed_page_size);
+      bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer()
+        .decompress(parentColumnReader.columnChunkMetaData.getCodec(),
+          compressedData,
+          uncompressedData,
+          pageHeader.compressed_page_size,
+          pageHeader.getUncompressed_page_size());
+      compressedData.release();
+    }
     currentPage = new Page(
         bytesIn,
         pageHeader.data_page_header.num_values,
@@ -172,12 +226,12 @@ final class PageReader {
       return false;
     }
 
-    pageDataByteArray = currentPage.getBytes().toByteArray();
+    pageDataByteArray = 
Unpooled.wrappedBuffer(currentPage.getBytes().toByteBuffer());
 
     readPosInBytes = 0;
     if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) {
       repetitionLevels = 
currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.REPETITION_LEVEL);
-      repetitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+      repetitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
       // we know that the first value will be a 0, at the end of each list of 
repeated values we will hit another 0 indicating
       // a new record, although we don't know the length until we hit it (and 
this is a one way stream of integers) so we
       // read the first zero here to simplify the reading processes, and start 
reading the first value the same as all
@@ -191,23 +245,23 @@ final class PageReader {
       if (!currentPage.getValueEncoding().usesDictionary()) {
         parentColumnReader.usingDictionary = false;
         definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
         readPosInBytes = definitionLevels.getNextOffset();
         if (parentColumnReader.columnDescriptor.getType() == 
PrimitiveType.PrimitiveTypeName.BOOLEAN) {
           valueReader = 
currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.VALUES);
-          valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+          valueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
         }
       } else {
         parentColumnReader.usingDictionary = true;
         definitionLevels = 
currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor,
 ValuesType.DEFINITION_LEVEL);
-        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        definitionLevels.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
         readPosInBytes = definitionLevels.getNextOffset();
         // initialize two of the dictionary readers, one is for determining 
the lengths of each value, the second is for
         // actually copying the values out into the vectors
         dictionaryLengthDeterminingReader = new 
DictionaryValuesReader(dictionary);
-        
dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        
dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
         dictionaryValueReader = new DictionaryValuesReader(dictionary);
-        dictionaryValueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray, (int) readPosInBytes);
+        dictionaryValueReader.initFromPage(currentPage.getValueCount(), 
pageDataByteArray.nioBuffer(), (int) readPosInBytes);
         this.parentColumnReader.usingDictionary = true;
       }
     }
@@ -221,5 +275,28 @@ final class PageReader {
   
   public void clear(){
     this.dataReader.clear();
+    // Free all memory, including fixed length types. (Data is being copied 
for all types not just var length types)
+    //if(!this.parentColumnReader.isFixedLength) {
+      for (ByteBuf b : allocatedBuffers) {
+        b.release();
+      }
+    //}
   }
+
+  /*
+    Allocate direct memory to read data into
+   */
+  private ByteBuf allocateBuffer(int size) {
+    ByteBuf b;
+    try {
+      b = 
parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size);
+      //b = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size);
+    }catch(Exception e){
+      throw new DrillRuntimeException("Unable to allocate "+size+" bytes of 
memory in the Parquet Reader."+
+        "[Exception: "+e.getMessage()+"]"
+      );
+    }
+    return b;
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
index 2228787..4e9ac81 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java
@@ -31,6 +31,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
@@ -82,6 +83,8 @@ public class ParquetRecordReader implements RecordReader {
   private VarLenBinaryReader varLengthReader;
   private ParquetMetadata footer;
   private List<SchemaPath> columns;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
   private final CodecFactoryExposer codecFactoryExposer;
   int rowGroupIndex;
 
@@ -107,6 +110,8 @@ public class ParquetRecordReader implements RecordReader {
     this.batchSize = batchSize;
     this.footer = footer;
     this.columns = columns;
+    this.fragmentContext=fragmentContext;
+
   }
 
   public CodecFactoryExposer getCodecFactoryExposer() {
@@ -133,6 +138,14 @@ public class ParquetRecordReader implements RecordReader {
     return batchSize;
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   /**
    * @param type a fixed length type from the parquet library enum
    * @return the length in pageDataByteArray of the type

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
index 979e8c3..555cb94 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java
@@ -19,8 +19,14 @@ package org.apache.drill.exec.store.parquet.columnreaders;
 
 import java.math.BigDecimal;
 
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder;
 import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
 import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
 import org.apache.drill.exec.vector.Decimal28SparseVector;
@@ -50,9 +56,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= decimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -79,9 +85,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
       int width = Decimal28SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= nullableDecimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -109,9 +115,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= decimal28Vector.getValueCapacity()) {
         return false;
       }
@@ -138,9 +144,9 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
       int width = Decimal38SparseHolder.WIDTH;
-      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, 
schemaElement.getScale());
+      BigDecimal intermediate = 
DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, 
schemaElement.getScale());
       if (index >= nullableDecimal38Vector.getValueCapacity()) {
         return false;
       }
@@ -170,16 +176,26 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) {
       boolean success;
       if(index >= varCharVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        success = varCharVector.getMutator().setSafe(index, 
currDictValToWrite.getBytes(),
-            0, currDictValToWrite.length());
+        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        int st=0;
+        int len=currDictVal.length();
+        VarCharHolder holder = new VarCharHolder();
+        holder.buffer=b;
+        holder.start=0;
+        holder.end=currDictVal.length();
+        success = varCharVector.getMutator().setSafe(index, holder);
       }
       else {
-        success = varCharVector.getMutator().setSafe(index, bytes, start, 
length);
+        VarCharHolder holder = new VarCharHolder();
+        holder.buffer=bytebuf;
+        holder.start=start;
+        holder.end=start+length;
+        success = varCharVector.getMutator().setSafe(index, holder);
       }
       return success;
     }
@@ -204,16 +220,28 @@ public class VarLengthColumnReaders {
       nullableVarCharVector = v;
     }
 
-    public boolean setSafe(int index, byte[] value, int start, int length) {
+    public boolean setSafe(int index, ByteBuf value, int start, int length) {
       boolean success;
       if(index >= nullableVarCharVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        success = nullableVarCharVector.getMutator().setSafe(index, 
currDictValToWrite.getBytes(),
-            0, currDictValToWrite.length());
+        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        int st=0;
+        int len=currDictVal.length();
+        NullableVarCharHolder holder = new NullableVarCharHolder();
+        holder.buffer=b;
+        holder.start=0;
+        holder.end=currDictVal.length();
+        success = nullableVarCharVector.getMutator().setSafe(index, holder);
+        holder.isSet=1;
       }
       else {
-        success = nullableVarCharVector.getMutator().setSafe(index, value, 
start, length);
+        NullableVarCharHolder holder = new NullableVarCharHolder();
+        holder.buffer=value;
+        holder.start=start;
+        holder.end=start+length;
+        holder.isSet=1;
+        success = nullableVarCharVector.getMutator().setSafe(index, holder);
       }
       return success;
     }
@@ -237,16 +265,26 @@ public class VarLengthColumnReaders {
     }
 
     @Override
-    public boolean setSafe(int index, byte[] bytes, int start, int length) {
+    public boolean setSafe(int index, ByteBuf value, int start, int length) {
       boolean success;
       if(index >= varBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        success = varBinaryVector.getMutator().setSafe(index, 
currDictValToWrite.getBytes(),
-            0, currDictValToWrite.length());
+        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        int st=0;
+        int len=currDictVal.length();
+        VarBinaryHolder holder = new VarBinaryHolder();
+        holder.buffer=b;
+        holder.start=0;
+        holder.end=currDictVal.length();
+        success = varBinaryVector.getMutator().setSafe(index, holder);
       }
       else {
-        success = varBinaryVector.getMutator().setSafe(index, bytes, start, 
length);
+        VarBinaryHolder holder = new VarBinaryHolder();
+        holder.buffer=value;
+        holder.start=start;
+        holder.end=start+length;
+        success = varBinaryVector.getMutator().setSafe(index, holder);
       }
       return success;
     }
@@ -271,16 +309,28 @@ public class VarLengthColumnReaders {
       nullableVarBinaryVector = v;
     }
 
-    public boolean setSafe(int index, byte[] value, int start, int length) {
+    public boolean setSafe(int index, ByteBuf value, int start, int length) {
       boolean success;
       if(index >= nullableVarBinaryVector.getValueCapacity()) return false;
 
       if (usingDictionary) {
-        success = nullableVarBinaryVector.getMutator().setSafe(index, 
currDictValToWrite.getBytes(),
-            0, currDictValToWrite.length());
+        ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer());
+        int st=0;
+        int len=currDictVal.length();
+        NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
+        holder.buffer=b;
+        holder.start=0;
+        holder.end=currDictVal.length();
+        holder.isSet=1;
+        success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
       }
       else {
-        success = nullableVarBinaryVector.getMutator().setSafe(index, value, 
start, length);
+        NullableVarBinaryHolder holder = new NullableVarBinaryHolder();
+        holder.buffer=value;
+        holder.start=start;
+        holder.end=start+length;
+        holder.isSet=1;
+        success = nullableVarBinaryVector.getMutator().setSafe(index, holder);
       }
       return  success;
     }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
index 092c186..7f16c64 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java
@@ -17,6 +17,7 @@
  
******************************************************************************/
 package org.apache.drill.exec.store.parquet.columnreaders;
 
+import io.netty.buffer.ByteBuf;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.vector.ValueVector;
 import org.apache.drill.exec.vector.VariableWidthVector;
@@ -48,7 +49,7 @@ public abstract class VarLengthValuesColumn<V extends 
ValueVector> extends VarLe
     }
   }
 
-  public abstract boolean setSafe(int index, byte[] bytes, int start, int 
length);
+  public abstract boolean setSafe(int index, ByteBuf bytes, int start, int 
length);
 
   @Override
   protected void readField(long recordToRead) {
@@ -79,7 +80,7 @@ public abstract class VarLengthValuesColumn<V extends 
ValueVector> extends VarLe
   protected boolean readAndStoreValueSizeInformation() throws IOException {
     // re-purposing this field here for length in BYTES to prevent repetitive 
multiplication/division
     try {
-    dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray,
+    dataTypeLengthInBits = 
BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(),
         (int) pageReader.readyToReadPosInBytes);
     } catch (Throwable t) {
       throw t;

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index aaeb536..0d2a225 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -22,6 +22,7 @@ import com.google.common.collect.Lists;
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.common.expression.PathSegment;
 import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
@@ -64,6 +65,8 @@ public class DrillParquetReader implements RecordReader {
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
   private List<ValueVector> primitiveVectors;
+  private OperatorContext operatorContext;
+
 
   public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, 
List<SchemaPath> columns, Configuration conf) {
     this.footer = footer;
@@ -201,4 +204,10 @@ public class DrillParquetReader implements RecordReader {
   @Override
   public void cleanup() {
   }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
index 1ebd1f5..9a5920d 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java
@@ -23,6 +23,7 @@ import java.util.Iterator;
 
 import org.apache.drill.common.exceptions.ExecutionSetupException;
 import org.apache.drill.exec.exception.SchemaChangeException;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.store.RecordReader;
 import org.apache.drill.exec.store.pojo.Writers.BitWriter;
@@ -47,12 +48,21 @@ public class PojoRecordReader<T> implements RecordReader{
   private PojoWriter[] writers;
   private boolean doCurrent;
   private T currentPojo;
+  private OperatorContext operatorContext;
 
   public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator){
     this.pojoClass = pojoClass;
     this.iterator = iterator;
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     try{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
index 28babd1..68921a2 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java
@@ -31,6 +31,7 @@ import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.expr.TypeHelper;
 import org.apache.drill.exec.ops.FragmentContext;
+import org.apache.drill.exec.ops.OperatorContext;
 import org.apache.drill.exec.physical.impl.OutputMutator;
 import org.apache.drill.exec.record.MaterializedField;
 import org.apache.drill.exec.store.RecordReader;
@@ -57,7 +58,8 @@ public class DrillTextRecordReader implements RecordReader {
   private byte delimiter;
   private int targetRecordCount;
   private FieldReference ref = new FieldReference(COL_NAME);
-  private FragmentContext context;
+  private FragmentContext fragmentContext;
+  private OperatorContext operatorContext;
   private RepeatedVarCharVector vector;
   private List<Integer> columnIds = Lists.newArrayList();
   private LongWritable key;
@@ -67,7 +69,7 @@ public class DrillTextRecordReader implements RecordReader {
   private boolean first = true;
 
   public DrillTextRecordReader(FileSplit split, FragmentContext context, char 
delimiter, List<SchemaPath> columns) {
-    this.context = context;
+    this.fragmentContext = context;
     this.delimiter = (byte) delimiter;
     boolean getEntireRow = false;
 
@@ -107,6 +109,14 @@ public class DrillTextRecordReader implements RecordReader 
{
     }
   }
 
+  public OperatorContext getOperatorContext() {
+    return operatorContext;
+  }
+
+  public void setOperatorContext(OperatorContext operatorContext) {
+    this.operatorContext = operatorContext;
+  }
+
   @Override
   public void setup(OutputMutator output) throws ExecutionSetupException {
     MaterializedField field = MaterializedField.create(ref, 
Types.repeated(TypeProtos.MinorType.VARCHAR));

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java 
b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
index cb36102..7868a53 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
@@ -18,9 +18,20 @@
 package parquet.hadoop;
 
 import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
+import java.util.HashMap;
+import java.util.Map;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.common.exceptions.DrillRuntimeException;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.io.compress.CompressionCodec;
+import org.apache.hadoop.io.compress.DirectDecompressionCodec;
+import org.apache.hadoop.util.ReflectionUtils;
 import parquet.bytes.BytesInput;
 import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.metadata.CompressionCodecName;
@@ -28,9 +39,11 @@ import parquet.hadoop.metadata.CompressionCodecName;
 public class CodecFactoryExposer{
 
   private CodecFactory codecFactory;
+  private final Map<String, 
org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new 
HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>();
+  private Configuration configuration;
 
   public CodecFactoryExposer(Configuration config){
-    codecFactory = new CodecFactory(config);
+    codecFactory = new CodecFactory(config);configuration = config;
   }
 
   public CodecFactory getCodecFactory() {
@@ -41,7 +54,84 @@ public class CodecFactoryExposer{
     return codecFactory.getDecompressor(codecName).decompress(bytes, 
uncompressedSize);
   }
 
-  public BytesDecompressor getDecompressor(CompressionCodecName codec) {
-    return codecFactory.getDecompressor(codec);
+  public BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int 
uncompressedSize) throws IOException {
+    ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize);
+    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+  }
+
+  public BytesInput decompress(CompressionCodecName codecName,
+                               ByteBuf compressedByteBuf,
+                               ByteBuf uncompressedByteBuf,
+                               int compressedSize,
+                               int uncompressedSize) throws IOException {
+    ByteBuffer inpBuffer=compressedByteBuf.nioBuffer(0, compressedSize);
+    ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize);
+    CompressionCodec c = getCodec(codecName);
+
+    Class<?> cx = c.getClass();
+    ClassLoader l = cx.getClassLoader();
+    Class<?>[] inf = cx.getInterfaces();
+
+    DirectDecompressionCodec d = (DirectDecompressionCodec)c;
+
+    if(d!=null) {
+      d.createDirectDecompressor().decompress(inpBuffer, outBuffer);
+    }else{
+      throw new DrillRuntimeException("Cannot create a decompression codec for 
codec "+codecName.name());
+    }
+    return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit());
+  }
+
+  private DirectDecompressionCodec getCodec(CompressionCodecName codecName) {
+    String codecClassName = codecName.getHadoopCompressionCodecClassName();
+    if (codecClassName == null) {
+      return null;
+    }
+    DirectDecompressionCodec codec = codecByName.get(codecClassName);
+    if (codec != null) {
+      return codec;
+    }
+
+    try {
+      Class<?> codecClass = Class.forName(codecClassName);
+      codec = 
(DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, 
configuration);
+      codecByName.put(codecClassName, codec);
+      return codec;
+    } catch (ClassNotFoundException e) {
+      throw new BadConfigurationException("Class " + codecClassName + " was 
not found", e);
+    }
+  }
+
+  public class HadoopByteBufBytesInput extends BytesInput{
+
+    private final ByteBuffer byteBuf;
+    private final int length;
+    private final int offset;
+
+    private HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int 
length) {
+      super();
+      this.byteBuf = byteBuf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      byteBuf.position(offset);
+      ByteBuffer tempBuf = byteBuf.slice();
+      tempBuf.limit(length);
+      outputChannel.write(tempBuf);
+    }
+
+    public ByteBuffer toByteBuffer() throws IOException {
+      byteBuf.position(offset);
+      ByteBuffer buf = byteBuf.slice();
+      buf.limit(length);
+      return buf;
+    }
+
+    public long size() {
+      return length;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
index 89beeb0..d399fa9 100644
--- 
a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
+++ 
b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java
@@ -27,6 +27,7 @@ import org.apache.drill.BaseTestQuery;
 import org.apache.drill.exec.ExecConstants;
 import org.apache.drill.exec.exception.SchemaChangeException;
 import org.apache.drill.exec.memory.TopLevelAllocator;
+import org.apache.drill.exec.planner.physical.PlannerSettings;
 import org.apache.drill.exec.record.BatchSchema;
 import org.apache.drill.exec.record.RecordBatchLoader;
 import org.apache.drill.exec.record.VectorWrapper;
@@ -219,7 +220,8 @@ public class TestParquetWriter extends BaseTestQuery {
       fs.delete(path, true);
     }
 
-    test("use dfs_test.tmp");
+    test("use dfs.tmp");
+//    test("ALTER SESSION SET `planner.add_producer_consumer` = false");
     String query = String.format("SELECT %s FROM %s", selection, inputTable);
     String create = "CREATE TABLE " + outputFile + " AS " + query;
     String validateQuery = String.format("SELECT %s FROM " + outputFile, 
validationSelection);

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/jdbc/pom.xml
----------------------------------------------------------------------
diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml
index b739db8..b9b3feb 100644
--- a/exec/jdbc/pom.xml
+++ b/exec/jdbc/pom.xml
@@ -127,7 +127,12 @@
       <dependencies>
         <dependency>
           <groupId>org.apache.hadoop</groupId>
-          <artifactId>hadoop-core</artifactId>
+          <artifactId>hadoop-client</artifactId>
+          <scope>test</scope>
+        </dependency>
+        <dependency>
+          <groupId>org.apache.hadoop</groupId>
+          <artifactId>hadoop-common</artifactId>
           <scope>test</scope>
         </dependency>
       </dependencies>
@@ -138,6 +143,13 @@
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-core</artifactId>
+          <version>2.4.1-mapr-4.0.0-SNAPSHOT</version>
+          <exclusions>
+            <exclusion>
+              <artifactId>commons-logging</artifactId>
+              <groupId>commons-logging</groupId>
+            </exclusion>
+          </exclusions>
           <scope>test</scope>
         </dependency>
       </dependencies>

Reply via email to