DRILL-1058: Read complex types in parquet

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/0d6befca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/0d6befca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/0d6befca

Branch: refs/heads/master
Commit: 0d6befca0bd98ad14cd2bb6700cdee5172cd8f90
Parents: 48229ec
Author: Steven Phillips <[email protected]>
Authored: Sun May 18 00:46:02 2014 -0700
Committer: Jacques Nadeau <[email protected]>
Committed: Tue Jul 29 21:42:30 2014 -0700

----------------------------------------------------------------------
 distribution/src/resources/hadoop-excludes.txt  |   1 +
 .../main/codegen/templates/ComplexWriters.java  |   4 +
 .../codegen/templates/NullableValueVectors.java |  15 +-
 .../codegen/templates/RepeatedValueVectors.java |   8 +
 .../templates/VariableLengthVectors.java        |   4 +
 .../org/apache/drill/exec/ExecConstants.java    |   2 +
 .../drill/exec/physical/impl/ScanBatch.java     |  12 +
 .../server/options/SystemOptionManager.java     |   1 +
 .../store/parquet/ParquetScanBatchCreator.java  |  46 +-
 .../parquet2/DrillParquetGroupConverter.java    | 439 +++++++++++++++++++
 .../exec/store/parquet2/DrillParquetReader.java | 204 +++++++++
 .../DrillParquetRecordMaterializer.java         |  57 +++
 .../drill/exec/vector/BaseDataValueVector.java  |   9 +
 .../drill/exec/vector/BaseValueVector.java      |   3 +
 .../apache/drill/exec/vector/ObjectVector.java  |   9 +
 .../drill/exec/vector/VariableWidthVector.java  |   2 +
 .../vector/complex/AbstractContainerVector.java |   4 +
 .../drill/exec/vector/complex/MapVector.java    |  16 +
 .../exec/vector/complex/RepeatedListVector.java |  14 +
 .../exec/vector/complex/RepeatedMapVector.java  |  17 +
 .../complex/impl/VectorContainerWriter.java     |   4 +
 .../parquet/hadoop/CodecFactoryExposer.java     |   5 +
 .../parquet/hadoop/ColumnChunkIncReadStore.java | 190 ++++++++
 .../org/apache/drill/ParquetSchemaMerge.java    |  48 ++
 .../org/apache/drill/TestExampleQueries.java    |   7 +
 .../src/test/resources/parquet/complex.parquet  | Bin 0 -> 862 bytes
 26 files changed, 1111 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/distribution/src/resources/hadoop-excludes.txt
----------------------------------------------------------------------
diff --git a/distribution/src/resources/hadoop-excludes.txt 
b/distribution/src/resources/hadoop-excludes.txt
index 36b5703..255bed4 100644
--- a/distribution/src/resources/hadoop-excludes.txt
+++ b/distribution/src/resources/hadoop-excludes.txt
@@ -12,3 +12,4 @@ eclipse
 common
 guava
 servlet
+parquet

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java 
b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
index 72ff135..ce839ef 100644
--- a/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
+++ b/exec/java-exec/src/main/codegen/templates/ComplexWriters.java
@@ -74,6 +74,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     if(ok()){
       // update to inform(addSafe) once available for all repeated vector 
types for holders.
       inform(mutator.addSafe(idx(), h));
+      vector.setCurrentValueCount(idx());
     }
   }
   
@@ -81,6 +82,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     if(ok()){
       // update to inform(addSafe) once available for all repeated vector 
types for holders.
       inform(mutator.addSafe(idx(), h));
+      vector.setCurrentValueCount(idx());
     }
   }
 
@@ -96,6 +98,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter {
     if(ok()){
       // update to inform(setSafe) once available for all vector types for 
holders.
       inform(mutator.setSafe(idx(), h));
+      vector.setCurrentValueCount(idx());
     }
   }
   
@@ -103,6 +106,7 @@ public class ${eName}WriterImpl extends AbstractFieldWriter 
{
     if(ok()){
       // update to inform(setSafe) once available for all vector types for 
holders.
       inform(mutator.setSafe(idx(), h));
+      vector.setCurrentValueCount(idx());
     }
   }
   

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
index 4873f2a..6876cab 100644
--- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java
@@ -61,7 +61,15 @@ public final class ${className} extends BaseValueVector 
implements <#if type.maj
   public int getValueCapacity(){
     return Math.min(bits.getValueCapacity(), values.getValueCapacity());
   }
-  
+
+  public int getCurrentValueCount() {
+    return values.getCurrentValueCount();
+  }
+
+  public void setCurrentValueCount(int count) {
+    values.setCurrentValueCount(count);
+  }
+
   @Override
   public ByteBuf[] getBuffers() {
     ByteBuf[] buffers = ObjectArrays.concat(bits.getBuffers(), 
values.getBuffers(), ByteBuf.class);
@@ -143,6 +151,11 @@ public final class ${className} extends BaseValueVector 
implements <#if type.maj
     return values.getByteCapacity();
   }
 
+  @Override
+  public int getCurrentSizeInBytes(){
+    return values.getCurrentSizeInBytes();
+  }
+
   <#else>
   @Override
   public SerializedField getMetadata() {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java 
b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
index b283d19..7cbc50c 100644
--- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java
@@ -67,6 +67,14 @@ package org.apache.drill.exec.vector;
     return Math.min(values.getValueCapacity(), offsets.getValueCapacity() - 1);
   }
 
+  public int getCurrentValueCount() {
+    return values.getCurrentValueCount();
+  }
+
+  public void setCurrentValueCount(int count) {
+    values.setCurrentValueCount(count);
+  }
+  
   public int getBufferSize(){
     return offsets.getBufferSize() + values.getBufferSize();
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java 
b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
index 1d30acb..b0af1fe 100644
--- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
+++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java
@@ -78,6 +78,10 @@ public final class ${minor.class}Vector extends 
BaseDataValueVector implements V
   public int getByteCapacity(){
     return data.capacity(); 
   }
+
+  public int getCurrentSizeInBytes() {
+    return offsetVector.getAccessor().get(currentValueCount);
+  }
   
   /**
    * Return the number of bytes contained in the current var len byte vector.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
index 595fdd7..f2a84ee 100644
--- a/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
+++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ExecConstants.java
@@ -81,6 +81,8 @@ public interface ExecConstants {
   public static final OptionValidator OUTPUT_FORMAT_VALIDATOR = new 
StringValidator(OUTPUT_FORMAT_OPTION, "parquet");
   public static final String PARQUET_BLOCK_SIZE = "store.parquet.block-size";
   public static final OptionValidator PARQUET_BLOCK_SIZE_VALIDATOR = new 
LongValidator(PARQUET_BLOCK_SIZE, 512*1024*1024);
+  public static String PARQUET_NEW_RECORD_READER = 
"store.parquet.use_new_reader";
+  public static OptionValidator PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR 
= new BooleanValidator(PARQUET_NEW_RECORD_READER, false);
 
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
index 21a580b..eaf26d1 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java
@@ -47,12 +47,17 @@ import org.apache.drill.exec.record.WritableBatch;
 import org.apache.drill.exec.record.selection.SelectionVector2;
 import org.apache.drill.exec.record.selection.SelectionVector4;
 import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.util.BatchPrinter;
+import org.apache.drill.exec.util.VectorUtil;
 import org.apache.drill.exec.vector.AllocationHelper;
 import org.apache.drill.exec.vector.NullableVarCharVector;
 import org.apache.drill.exec.vector.ValueVector;
 
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
 
 /**
  * Record batch used for a particular scan. Operators against one or more
@@ -136,6 +141,7 @@ public class ScanBatch implements RecordBatch {
     if (done) {
       return IterOutcome.NONE;
     }
+    long t1 = System.nanoTime();
     oContext.getStats().startProcessing();
     try {
       mutator.allocate(MAX_RECORD_CNT);
@@ -177,8 +183,14 @@ public class ScanBatch implements RecordBatch {
       if (mutator.isNewSchema()) {
         container.buildSchema(SelectionVectorMode.NONE);
         schema = container.getSchema();
+        long t2 = System.nanoTime();
+//        System.out.println((t2 - t1) / recordCount);
+//        BatchPrinter.printBatch(this, "\t");
         return IterOutcome.OK_NEW_SCHEMA;
       } else {
+        long t2 = System.nanoTime();
+//        System.out.println((t2 - t1) / recordCount);
+//        BatchPrinter.printBatch(this, "\t");
         return IterOutcome.OK;
       }
     } catch (Exception ex) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
index 6a8cc5e..e49b107 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java
@@ -60,6 +60,7 @@ public class SystemOptionManager implements OptionManager{
       PlannerSettings.HASH_SINGLE_KEY,
       ExecConstants.OUTPUT_FORMAT_VALIDATOR,
       ExecConstants.PARQUET_BLOCK_SIZE_VALIDATOR,
+      ExecConstants.PARQUET_RECORD_READER_IMPLEMENTATION_VALIDATOR,
       ExecConstants.SLICE_TARGET_OPTION,
       ExecConstants.AFFINITY_FACTOR,
       ExecConstants.MAX_WIDTH_GLOBAL,

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
index b4f02fb..ae73af3 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java
@@ -38,15 +38,24 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.Lists;
 
 import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
+import org.apache.drill.exec.store.parquet2.DrillParquetReader;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
 import parquet.hadoop.ParquetFileReader;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
 
 public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan>{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(ParquetScanBatchCreator.class);
 
+  private static final String ENABLE_BYTES_READ_COUNTER = 
"parquet.benchmark.bytes.read";
+  private static final String ENABLE_BYTES_TOTAL_COUNTER = 
"parquet.benchmark.bytes.total";
+  private static final String ENABLE_TIME_READ_COUNTER = 
"parquet.benchmark.time.read";
+
   @Override
   public RecordBatch getBatch(FragmentContext context, ParquetRowGroupScan 
rowGroupScan, List<RecordBatch> children) throws ExecutionSetupException {
     Preconditions.checkArgument(children.isEmpty());
@@ -74,7 +83,11 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
 
 
     FileSystem fs = 
rowGroupScan.getStorageEngine().getFileSystem().getUnderlying();
-    
+    Configuration conf = fs.getConf();
+    conf.setBoolean(ENABLE_BYTES_READ_COUNTER, false);
+    conf.setBoolean(ENABLE_BYTES_TOTAL_COUNTER, false);
+    conf.setBoolean(ENABLE_TIME_READ_COUNTER, false);
+
     // keep footers in a map to avoid re-reading them
     Map<String, ParquetMetadata> footers = new HashMap<String, 
ParquetMetadata>();
     int numParts = 0;
@@ -91,14 +104,19 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
           footers.put(e.getPath(),
               ParquetFileReader.readFooter( fs.getConf(), new 
Path(e.getPath())));
         }
-        readers.add(
-            new ParquetRecordReader(
-                context, e.getPath(), e.getRowGroupIndex(), fs,
-                rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
-                footers.get(e.getPath()),
-                rowGroupScan.getColumns()
-            )
-        );
+        if 
(!context.getOptions().getOption(ExecConstants.PARQUET_NEW_RECORD_READER).bool_val
 && !isComplex(footers.get(e.getPath()))) {
+          readers.add(
+              new ParquetRecordReader(
+                  context, e.getPath(), e.getRowGroupIndex(), fs,
+                  rowGroupScan.getStorageEngine().getCodecFactoryExposer(),
+                  footers.get(e.getPath()),
+                  rowGroupScan.getColumns()
+              )
+          );
+        } else {
+          ParquetMetadata footer = footers.get(e.getPath());
+          readers.add(new DrillParquetReader(footer, e, columns, conf));
+        }
         if (rowGroupScan.getSelectionRoot() != null) {
           String[] r = rowGroupScan.getSelectionRoot().split("/");
           String[] p = e.getPath().split("/");
@@ -125,4 +143,14 @@ public class ParquetScanBatchCreator implements 
BatchCreator<ParquetRowGroupScan
 
     return new ScanBatch(rowGroupScan, context, readers.iterator(), 
partitionColumns, selectedPartitionColumns);
   }
+
+  private static boolean isComplex(ParquetMetadata footer) {
+    MessageType schema = footer.getFileMetaData().getSchema();
+    for (Type type : schema.getFields()) {
+      if (!type.isPrimitive()) {
+        return true;
+      }
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
new file mode 100644
index 0000000..f9bac65
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java
@@ -0,0 +1,439 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import com.google.common.collect.Lists;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.Unpooled;
+
+import org.apache.drill.common.util.DecimalUtility;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Decimal18Holder;
+import org.apache.drill.exec.expr.holders.Decimal28SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal38SparseHolder;
+import org.apache.drill.exec.expr.holders.Decimal9Holder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.VarBinaryHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.store.ParquetOutputRecordWriter;
+import 
org.apache.drill.exec.store.parquet.columnreaders.NullableFixedByteAlignedReaders;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter;
+import org.apache.drill.exec.vector.complex.writer.BigIntWriter;
+import org.apache.drill.exec.vector.complex.writer.BitWriter;
+import org.apache.drill.exec.vector.complex.writer.DateWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal18Writer;
+import org.apache.drill.exec.vector.complex.writer.Decimal28SparseWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal38SparseWriter;
+import org.apache.drill.exec.vector.complex.writer.Decimal9Writer;
+import org.apache.drill.exec.vector.complex.writer.Float4Writer;
+import org.apache.drill.exec.vector.complex.writer.Float8Writer;
+import org.apache.drill.exec.vector.complex.writer.IntWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeStampWriter;
+import org.apache.drill.exec.vector.complex.writer.TimeWriter;
+import org.apache.drill.exec.vector.complex.writer.VarBinaryWriter;
+import org.apache.drill.exec.vector.complex.writer.VarCharWriter;
+import org.joda.time.DateTimeUtils;
+
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.DecimalMetadata;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+import java.math.BigDecimal;
+import java.util.List;
+
+public class DrillParquetGroupConverter extends GroupConverter {
+
+  private List<Converter> converters;
+  private MapWriter mapWriter;
+
+  public DrillParquetGroupConverter(ComplexWriterImpl complexWriter, 
MessageType schema) {
+    this(complexWriter.rootAsMap(), schema);
+  }
+
+  public DrillParquetGroupConverter(MapWriter mapWriter, GroupType schema) {
+    this.mapWriter = mapWriter;
+    converters = Lists.newArrayList();
+    for (Type type : schema.getFields()) {
+      Repetition rep = type.getRepetition();
+      boolean isPrimitive = type.isPrimitive();
+      if (!isPrimitive) {
+        if (rep != Repetition.REPEATED) {
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mapWriter.map(type.getName()), type.asGroupType());
+          converters.add(converter);
+        } else {
+          DrillParquetGroupConverter converter = new 
DrillParquetGroupConverter(mapWriter.list(type.getName()).map(), 
type.asGroupType());
+          converters.add(converter);
+        }
+      } else {
+        PrimitiveConverter converter = 
getConverterForType(type.asPrimitiveType());
+        converters.add(converter);
+      }
+    }
+  }
+
+  private PrimitiveConverter getConverterForType(PrimitiveType type) {
+
+    String name = type.getName();
+    switch(type.getPrimitiveTypeName()) {
+      case INT32: {
+        if (type.getOriginalType() == null) {
+          IntWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).integer() : mapWriter.integer(name);
+          return new DrillIntConverter(writer);
+        }
+        switch(type.getOriginalType()) {
+          case DECIMAL: {
+            Decimal9Writer writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal9() : 
mapWriter.decimal9(name);
+            return new DrillDecimal9Converter(writer, 
type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale());
+          }
+          case DATE: {
+            DateWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).date() : mapWriter.date(name);
+            return new DrillDateConverter(writer);
+          }
+          case TIME_MILLIS: {
+            TimeWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).time() : mapWriter.time(name);
+            return new DrillTimeConverter(writer);
+          }
+          default: {
+            throw new UnsupportedOperationException("Unsupported type: " + 
type.getOriginalType());
+          }
+        }
+      }
+      case INT64: {
+        if (type.getOriginalType() == null) {
+          BigIntWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).bigInt() : mapWriter.bigInt(name);
+          return new DrillBigIntConverter(writer);
+        }
+        switch(type.getOriginalType()) {
+          case DECIMAL: {
+            Decimal18Writer writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal18() : 
mapWriter.decimal18(name);
+            return new DrillDecimal18Converter(writer, 
type.getDecimalMetadata().getPrecision(), type.getDecimalMetadata().getScale());
+          }
+          case TIMESTAMP_MILLIS: {
+            TimeStampWriter writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).timeStamp() : 
mapWriter.timeStamp(name);
+            return new DrillTimeStampConverter(writer);
+          }
+          default: {
+            throw new UnsupportedOperationException("Unsupported type " + 
type.getOriginalType());
+          }
+        }
+      }
+      case FLOAT: {
+        Float4Writer writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).float4() : mapWriter.float4(name);
+        return new DrillFloat4Converter(writer);
+      }
+      case DOUBLE: {
+        Float8Writer writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).float8() : mapWriter.float8(name);
+        return new DrillFloat8Converter(writer);
+      }
+      case BOOLEAN: {
+        BitWriter writer = type.getRepetition() == Repetition.REPEATED ? 
mapWriter.list(name).bit() : mapWriter.bit(name);
+        return new DrillBoolConverter(writer);
+      }
+      case BINARY: {
+        if (type.getOriginalType() == null) {
+          VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED 
? mapWriter.list(name).varBinary() : mapWriter.varBinary(name);
+          return new DrillVarBinaryConverter(writer);
+        }
+        switch(type.getOriginalType()) {
+          case UTF8: {
+            VarCharWriter writer = type.getRepetition() == Repetition.REPEATED 
? mapWriter.list(name).varChar() : mapWriter.varChar(name);
+            return new DrillVarCharConverter(writer);
+          }
+          case DECIMAL: {
+            DecimalMetadata metadata = type.getDecimalMetadata();
+            if (metadata.getPrecision() <= 28) {
+              Decimal28SparseWriter writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : 
mapWriter.decimal28Sparse(name);
+              return new DrillBinaryToDecimal28Converter(writer, 
metadata.getPrecision(), metadata.getScale());
+            } else {
+              Decimal38SparseWriter writer = type.getRepetition() == 
Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : 
mapWriter.decimal38Sparse(name);
+              return new DrillBinaryToDecimal38Converter(writer, 
metadata.getPrecision(), metadata.getScale());
+            }
+          }
+          default: {
+            throw new UnsupportedOperationException("Unsupported type " + 
type.getOriginalType());
+          }
+        }
+      }
+      default:
+        throw new UnsupportedOperationException("Unsupported type: " + 
type.getPrimitiveTypeName());
+    }
+  }
+
+  @Override
+  public Converter getConverter(int i) {
+    return converters.get(i);
+  }
+
+  @Override
+  public void start() {
+    mapWriter.start();
+  }
+
+  @Override
+  public void end() {
+    mapWriter.end();
+  }
+
+  public static class DrillIntConverter extends PrimitiveConverter {
+    private IntWriter writer;
+    private IntHolder holder = new IntHolder();
+
+    public DrillIntConverter(IntWriter writer) {
+      super();
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillDecimal9Converter extends PrimitiveConverter {
+    private Decimal9Writer writer;
+    private Decimal9Holder holder = new Decimal9Holder();
+    int precision;
+    int scale;
+
+    public DrillDecimal9Converter(Decimal9Writer writer, int precision, int 
scale) {
+      this.writer = writer;
+      this.scale = scale;
+      this.precision = precision;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillDateConverter extends PrimitiveConverter {
+    private DateWriter writer;
+    private DateHolder holder = new DateHolder();
+
+    public DrillDateConverter(DateWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = DateTimeUtils.fromJulianDay(value - 
ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5);
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillTimeConverter extends PrimitiveConverter {
+    private TimeWriter writer;
+    private TimeHolder holder = new TimeHolder();
+
+    public DrillTimeConverter(TimeWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillBigIntConverter extends PrimitiveConverter {
+    private BigIntWriter writer;
+    private BigIntHolder holder = new BigIntHolder();
+
+    public DrillBigIntConverter(BigIntWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addLong(long value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillTimeStampConverter extends PrimitiveConverter {
+    private TimeStampWriter writer;
+    private TimeStampHolder holder = new TimeStampHolder();
+
+    public DrillTimeStampConverter(TimeStampWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addInt(int value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillDecimal18Converter extends PrimitiveConverter {
+    private Decimal18Writer writer;
+    private Decimal18Holder holder = new Decimal18Holder();
+
+    public DrillDecimal18Converter(Decimal18Writer writer, int precision, int 
scale) {
+      this.writer = writer;
+      holder.precision = precision;
+      holder.scale = scale;
+    }
+
+    @Override
+    public void addLong(long value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillFloat4Converter extends PrimitiveConverter {
+    private Float4Writer writer;
+    private Float4Holder holder = new Float4Holder();
+
+    public DrillFloat4Converter(Float4Writer writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addFloat(float value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillFloat8Converter extends PrimitiveConverter {
+    private Float8Writer writer;
+    private Float8Holder holder = new Float8Holder();
+
+    public DrillFloat8Converter(Float8Writer writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addDouble(double value) {
+      holder.value = value;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillBoolConverter extends PrimitiveConverter {
+    private BitWriter writer;
+    private BitHolder holder = new BitHolder();
+
+    public DrillBoolConverter(BitWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addBoolean(boolean value) {
+      holder.value = value ? 1 : 0;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillVarBinaryConverter extends PrimitiveConverter {
+    private VarBinaryWriter writer;
+    private VarBinaryHolder holder = new VarBinaryHolder();
+
+    public DrillVarBinaryConverter(VarBinaryWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
+      holder.buffer = buf;
+      holder.start = 0;
+      holder.end = value.length();
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillVarCharConverter extends PrimitiveConverter {
+    private VarCharWriter writer;
+    private VarCharHolder holder = new VarCharHolder();
+
+    public DrillVarCharConverter(VarCharWriter writer) {
+      this.writer = writer;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer());
+      holder.buffer = buf;
+      holder.start = 0;
+      holder.end = value.length();
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillBinaryToDecimal28Converter extends 
PrimitiveConverter {
+    private Decimal28SparseWriter writer;
+    private Decimal28SparseHolder holder = new Decimal28SparseHolder();
+
+    public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int 
precision, int scale) {
+      this.writer = writer;
+      holder.precision = precision;
+      holder.scale = scale;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      BigDecimal bigDecimal = 
DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), 
holder.scale);
+      ByteBuf buf = Unpooled.wrappedBuffer(new byte[28]);
+      DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, 
holder.precision, Decimal28SparseHolder.nDecimalDigits);
+      holder.buffer = buf;
+      writer.write(holder);
+    }
+  }
+
+  public static class DrillBinaryToDecimal38Converter extends 
PrimitiveConverter {
+    private Decimal38SparseWriter writer;
+    private Decimal38SparseHolder holder = new Decimal38SparseHolder();
+
+    public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int 
precision, int scale) {
+      this.writer = writer;
+      holder.precision = precision;
+      holder.scale = scale;
+    }
+
+    @Override
+    public void addBinary(Binary value) {
+      BigDecimal bigDecimal = 
DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), 
holder.scale);
+      ByteBuf buf = Unpooled.wrappedBuffer(new byte[38]);
+      DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, 
holder.precision, Decimal38SparseHolder.nDecimalDigits);
+      holder.buffer = buf;
+      writer.write(holder);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
new file mode 100644
index 0000000..aaeb536
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -0,0 +1,204 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import org.apache.drill.common.exceptions.ExecutionSetupException;
+import org.apache.drill.common.expression.PathSegment;
+import org.apache.drill.common.expression.SchemaPath;
+import org.apache.drill.exec.physical.impl.OutputMutator;
+import org.apache.drill.exec.store.RecordReader;
+import org.apache.drill.exec.store.parquet.RowGroupReadEntry;
+import org.apache.drill.exec.vector.BaseValueVector;
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.VariableWidthVector;
+import org.apache.drill.exec.vector.complex.impl.VectorContainerWriter;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import parquet.hadoop.CodecFactoryExposer;
+import parquet.hadoop.ColumnChunkIncReadStore;
+import parquet.hadoop.metadata.BlockMetaData;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.ColumnPath;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.io.ColumnIOFactory;
+import parquet.io.InvalidRecordException;
+import parquet.io.MessageColumnIO;
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class DrillParquetReader implements RecordReader {
+
+  static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(DrillParquetReader.class);
+
+  private ParquetMetadata footer;
+  private MessageType schema;
+  private Configuration conf;
+  private RowGroupReadEntry entry;
+  private List<SchemaPath> columns;
+  private VectorContainerWriter writer;
+  private parquet.io.RecordReader<Void> recordReader;
+  private DrillParquetRecordMaterializer recordMaterializer;
+  private int recordCount;
+  private List<ValueVector> primitiveVectors;
+
+  public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, 
List<SchemaPath> columns, Configuration conf) {
+    this.footer = footer;
+    this.conf = conf;
+    this.columns = columns;
+    this.entry = entry;
+  }
+
+  public static MessageType getProjection(MessageType schema, List<SchemaPath> 
columns) {
+    MessageType projection = null;
+    for (SchemaPath path : columns) {
+      List<String> segments = Lists.newArrayList();
+      PathSegment rootSegment = path.getRootSegment();
+      PathSegment seg = rootSegment;
+      String messageName = schema.getName();
+      while(seg != null){
+        if(seg.isNamed()) {
+          segments.add(seg.getNameSegment().getPath());
+        }
+        seg = seg.getChild();
+      }
+      String[] pathSegments = new String[segments.size()];
+      segments.toArray(pathSegments);
+      Type type = null;
+      try {
+        type = schema.getType(pathSegments);
+      } catch (InvalidRecordException e) {
+        logger.warn("Invalid record" , e);
+      }
+      if (type != null) {
+        Type t = getType(pathSegments, 0, schema);
+        if (projection == null) {
+          projection = new MessageType(messageName, t);
+        } else {
+          projection = projection.union(new MessageType(messageName, t));
+        }
+      }
+    }
+    return projection;
+  }
+
+  @Override
+  public void setup(OutputMutator output) throws ExecutionSetupException {
+
+    try {
+      schema = footer.getFileMetaData().getSchema();
+      MessageType projection = null;
+
+      if (columns == null || columns.size() == 0) {
+        projection = schema;
+      } else {
+        projection = getProjection(schema, columns);
+        if (projection == null) {
+          projection = schema;
+        }
+      }
+
+      logger.debug("Requesting schema {}", projection);
+
+      ColumnIOFactory factory = new ColumnIOFactory(false);
+      MessageColumnIO columnIO = factory.getColumnIO(projection, schema);
+      Map<ColumnPath, ColumnChunkMetaData> paths = new HashMap();
+
+      for (ColumnChunkMetaData md : 
footer.getBlocks().get(entry.getRowGroupIndex()).getColumns()) {
+        paths.put(md.getPath(), md);
+      }
+
+      CodecFactoryExposer codecFactoryExposer = new CodecFactoryExposer(conf);
+      FileSystem fs = FileSystem.get(conf);
+      Path filePath = new Path(entry.getPath());
+
+      BlockMetaData blockMetaData = 
footer.getBlocks().get(entry.getRowGroupIndex());
+
+      recordCount = (int) blockMetaData.getRowCount();
+
+      ColumnChunkIncReadStore pageReadStore = new 
ColumnChunkIncReadStore(recordCount,
+              codecFactoryExposer.getCodecFactory(), fs, filePath);
+
+      for (String[] path : schema.getPaths()) {
+        Type type = schema.getType(path);
+        if (type.isPrimitive()) {
+          ColumnChunkMetaData md = paths.get(ColumnPath.get(path));
+          pageReadStore.addColumn(schema.getColumnDescription(path), md);
+        }
+      }
+
+      writer = new VectorContainerWriter(output);
+      recordMaterializer = new DrillParquetRecordMaterializer(writer, 
projection);
+      primitiveVectors = writer.getMapVector().getPrimitiveVectors();
+      recordReader = columnIO.getRecordReader(pageReadStore, 
recordMaterializer);
+    } catch (Exception e) {
+      throw new ExecutionSetupException(e);
+    }
+  }
+
+  private static Type getType(String[] pathSegments, int depth, MessageType 
schema) {
+    Type type = schema.getType(Arrays.copyOfRange(pathSegments, 0, depth + 1));
+    if (depth + 1 == pathSegments.length) {
+      return type;
+    } else {
+      Preconditions.checkState(!type.isPrimitive());
+      return new GroupType(type.getRepetition(), type.getName(), 
getType(pathSegments, depth + 1, schema));
+    }
+  }
+
+  private long totalRead = 0;
+
+  @Override
+  public int next() {
+    int count = 0;
+    for (; count < 4000 && totalRead < recordCount; count++, totalRead++) {
+      recordMaterializer.setPosition(count);
+      recordReader.read();
+      if (count % 100 == 0) {
+        if (getPercentFilled() > 85) {
+          break;
+        }
+      }
+    }
+    writer.setValueCount(count);
+    return count;
+  }
+
+  private int getPercentFilled() {
+    int filled = 0;
+    for (ValueVector v : primitiveVectors) {
+      filled = Math.max(filled, ((BaseValueVector) v).getCurrentValueCount() * 
100 / v.getValueCapacity());
+      if (v instanceof VariableWidthVector) {
+        filled = Math.max(filled, ((VariableWidthVector) 
v).getCurrentSizeInBytes() * 100 / ((VariableWidthVector) v).getByteCapacity());
+      }
+    }
+    return filled;
+  }
+
+  @Override
+  public void cleanup() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
new file mode 100644
index 0000000..69893dc
--- /dev/null
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.parquet2;
+
+import org.apache.drill.exec.vector.ValueVector;
+import org.apache.drill.exec.vector.complex.MapVector;
+import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl;
+import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter;
+import parquet.io.api.GroupConverter;
+import parquet.io.api.RecordMaterializer;
+import parquet.schema.MessageType;
+
+import java.util.List;
+
+public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> {
+
+  public DrillParquetGroupConverter root;
+  private ComplexWriter complexWriter;
+
+  public DrillParquetRecordMaterializer(ComplexWriter complexWriter, 
MessageType schema) {
+    this.complexWriter = complexWriter;
+    root = new DrillParquetGroupConverter(complexWriter.rootAsMap(), schema);
+  }
+
+  public void setPosition(int position) {
+    complexWriter.setPosition(position);
+  }
+
+  public boolean ok() {
+    return complexWriter.ok();
+  }
+
+  @Override
+  public Void getCurrentRecord() {
+    return null;
+  }
+
+  @Override
+  public GroupConverter getRootConverter() {
+    return root;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
index e70f406..7d0fbc7 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseDataValueVector.java
@@ -32,6 +32,7 @@ public abstract class BaseDataValueVector extends 
BaseValueVector{
 
   protected ByteBuf data = DeadBuf.DEAD_BUFFER;
   protected int valueCount;
+  protected int currentValueCount;
 
   public BaseDataValueVector(MaterializedField field, BufferAllocator 
allocator) {
     super(field, allocator);
@@ -50,6 +51,14 @@ public abstract class BaseDataValueVector extends 
BaseValueVector{
     }
   }
 
+  public void setCurrentValueCount(int count) {
+    currentValueCount = count;
+  }
+
+  public int getCurrentValueCount() {
+    return currentValueCount;
+  }
+
 
   @Override
   public ByteBuf[] getBuffers(){

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
index f968435..e310b81 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BaseValueVector.java
@@ -58,6 +58,9 @@ public abstract class BaseValueVector implements ValueVector{
     return getField().getAsBuilder();
   }
 
+  public abstract int getCurrentValueCount();
+  public abstract void setCurrentValueCount(int count);
+
   abstract public ByteBuf getData();
 
   abstract class BaseAccessor implements ValueVector.Accessor{

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
index 6998a74..25aff57 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/ObjectVector.java
@@ -125,6 +125,15 @@ public class ObjectVector extends BaseValueVector{
   }
 
   @Override
+  public int getCurrentValueCount() {
+    return 0;
+  }
+
+  @Override
+  public void setCurrentValueCount(int count) {
+  }
+
+  @Override
   public ByteBuf getData() {
     throw new UnsupportedOperationException("ObjectVector does not support 
this");
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
index 6660351..5c1d3ab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/VariableWidthVector.java
@@ -34,6 +34,8 @@ public interface VariableWidthVector extends ValueVector{
    * @return
    */
   public int getByteCapacity();
+
+  public int getCurrentSizeInBytes();
   
   /**
    * Load the records in the provided buffer based on the given number of 
values.

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
index 9667fd2..5eb1358 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/AbstractContainerVector.java
@@ -24,6 +24,8 @@ import org.apache.drill.common.types.TypeProtos.MinorType;
 import org.apache.drill.exec.record.TypedFieldId;
 import org.apache.drill.exec.vector.ValueVector;
 
+import java.util.List;
+
 public abstract class AbstractContainerVector implements ValueVector{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(AbstractContainerVector.class);
 
@@ -39,6 +41,8 @@ public abstract class AbstractContainerVector implements 
ValueVector{
     }
   }
 
+  public abstract List<ValueVector> getPrimitiveVectors();
+
   public abstract VectorWithOrdinal getVectorWithOrdinal(String name);
 
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
index ed2ad8a..480b863 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapVector.java
@@ -76,6 +76,22 @@ public class MapVector extends AbstractContainerVector {
     return vectors.size();
   }
 
+  @Override
+  public List<ValueVector> getPrimitiveVectors() {
+    List<ValueVector> primitiveVectors = Lists.newArrayList();
+    for (ValueVector v : this.vectors.values()) {
+      if (v instanceof AbstractContainerVector) {
+        AbstractContainerVector av = (AbstractContainerVector) v;
+        for (ValueVector vv : av.getPrimitiveVectors()) {
+          primitiveVectors.add(vv);
+        }
+      } else {
+        primitiveVectors.add(v);
+      }
+    }
+    return primitiveVectors;
+  }
+
   transient private MapTransferPair ephPair;
   transient private MapSingleCopier ephPair2;
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
index d43bf59..57c47d4 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedListVector.java
@@ -78,6 +78,20 @@ public class RepeatedListVector extends 
AbstractContainerVector implements Repea
     return vector != null ? 1 : 0;
   }
 
+  @Override
+  public List<ValueVector> getPrimitiveVectors() {
+    List<ValueVector> primitiveVectors = Lists.newArrayList();
+    if (vector instanceof AbstractContainerVector) {
+      for (ValueVector v : ((AbstractContainerVector) 
vector).getPrimitiveVectors()) {
+        primitiveVectors.add(v);
+      }
+    } else {
+      primitiveVectors.add(vector);
+    }
+    primitiveVectors.add(offsets);
+    return primitiveVectors;
+  }
+
   public RepeatedListVector(SchemaPath path, BufferAllocator allocator){
     this(MaterializedField.create(path, TYPE), allocator);
   }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
index 952fb4b..cb77032 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/RepeatedMapVector.java
@@ -92,6 +92,23 @@ public class RepeatedMapVector extends 
AbstractContainerVector implements Repeat
   }
 
   @Override
+  public List<ValueVector> getPrimitiveVectors() {
+    List<ValueVector> primitiveVectors = Lists.newArrayList();
+    for (ValueVector v : this.vectors.values()) {
+      if (v instanceof AbstractContainerVector) {
+        AbstractContainerVector av = (AbstractContainerVector) v;
+        for (ValueVector vv : av.getPrimitiveVectors()) {
+          primitiveVectors.add(vv);
+        }
+      } else {
+        primitiveVectors.add(v);
+      }
+    }
+    primitiveVectors.add(offsets);
+    return primitiveVectors;
+  }
+
+  @Override
   public <T extends ValueVector> T addOrGet(String name, MajorType type, 
Class<T> clazz) {
     ValueVector v = vectors.get(name);
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
index bc1d367..4f669c0 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/impl/VectorContainerWriter.java
@@ -40,6 +40,10 @@ public class VectorContainerWriter extends 
AbstractFieldWriter implements Comple
     this.mapRoot = new SingleMapWriter(mapVector, this);
   }
 
+  public MapVector getMapVector() {
+    return mapVector;
+  }
+
   public void reset() {
     setPosition(0);
     resetState();

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java 
b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
index dcd20b1..cb36102 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
 
 import parquet.bytes.BytesInput;
+import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.metadata.CompressionCodecName;
 
 public class CodecFactoryExposer{
@@ -39,4 +40,8 @@ public class CodecFactoryExposer{
   public BytesInput decompress(BytesInput bytes, int uncompressedSize, 
CompressionCodecName codecName) throws IOException {
     return codecFactory.getDecompressor(codecName).decompress(bytes, 
uncompressedSize);
   }
+
+  public BytesDecompressor getDecompressor(CompressionCodecName codec) {
+    return codecFactory.getDecompressor(codec);
+  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
new file mode 100644
index 0000000..e5a477b
--- /dev/null
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -0,0 +1,190 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.compress.Decompressor;
+import parquet.bytes.BytesInput;
+import parquet.column.ColumnDescriptor;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.page.PageReadStore;
+import parquet.column.page.PageReader;
+import parquet.format.PageHeader;
+import parquet.format.PageType;
+import parquet.format.Util;
+import parquet.format.converter.ParquetMetadataConverter;
+import parquet.hadoop.CodecFactory.BytesDecompressor;
+import parquet.hadoop.metadata.ColumnChunkMetaData;
+import parquet.hadoop.metadata.FileMetaData;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+
+public class ColumnChunkIncReadStore implements PageReadStore {
+
+  private static ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
+
+  private CodecFactory codecFactory = new CodecFactory(new Configuration());
+  private FileSystem fs;
+  private Path path;
+  private long rowCount;
+  private List<FSDataInputStream> streams = new ArrayList();
+
+  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, 
FileSystem fs, Path path) {
+    this.codecFactory = codecFactory;
+    this.fs = fs;
+    this.path = path;
+    this.rowCount = rowCount;
+  }
+
+
+  public class ColumnChunkIncPageReader implements PageReader {
+
+    ColumnChunkMetaData metaData;
+    long fileOffset;
+    long size;
+    private long valueReadSoFar = 0;
+
+    private DictionaryPage dictionaryPage;
+    private FSDataInputStream in;
+    private BytesDecompressor decompressor;
+
+    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, 
FSDataInputStream in) {
+      this.metaData = metaData;
+      this.size = metaData.getTotalSize();
+      this.fileOffset = metaData.getStartingPos();
+      this.in = in;
+      this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
+    }
+
+    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, 
FSDataInputStream in, CodecFactory codecFactory) {
+      this.metaData = metaData;
+      this.size = metaData.getTotalSize();
+      this.fileOffset = metaData.getStartingPos();
+      this.in = in;
+      this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
+    }
+
+    @Override
+    public DictionaryPage readDictionaryPage() {
+      if (dictionaryPage == null) {
+        try {
+          long pos = in.getPos();
+          PageHeader pageHeader = Util.readPageHeader(in);
+          if (pageHeader.getDictionary_page_header() == null) {
+            in.seek(pos);
+            return null;
+          }
+          dictionaryPage =
+                  new DictionaryPage(
+                          decompressor.decompress(BytesInput.from(in, 
pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                          
pageHeader.getDictionary_page_header().getNum_values(),
+                          
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                  );
+          System.out.println(dictionaryPage);
+        } catch (IOException e) {
+          throw new RuntimeException(e);
+        }
+//        if (dictionaryPage == null) {
+//          throw new RuntimeException("Dictionary page null");
+//        }
+      }
+      return dictionaryPage;
+    }
+
+    @Override
+    public long getTotalValueCount() {
+      return metaData.getValueCount();
+    }
+
+    @Override
+    public Page readPage() {
+      try {
+        while(valueReadSoFar < metaData.getValueCount()) {
+          PageHeader pageHeader = Util.readPageHeader(in);
+          switch (pageHeader.type) {
+            case DICTIONARY_PAGE:
+              if (dictionaryPage == null) {
+                dictionaryPage =
+                        new DictionaryPage(
+                                decompressor.decompress(BytesInput.from(in, 
pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()),
+                                pageHeader.uncompressed_page_size,
+                                
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
+                        );
+              } else {
+                in.skip(pageHeader.compressed_page_size);
+              }
+              break;
+            case DATA_PAGE:
+              valueReadSoFar += pageHeader.data_page_header.getNum_values();
+              return new Page(
+                      
decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), 
pageHeader.getUncompressed_page_size()),
+                      pageHeader.data_page_header.num_values,
+                      pageHeader.uncompressed_page_size,
+                      
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
+                      
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
+                      
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
+              );
+            default:
+              in.skip(pageHeader.compressed_page_size);
+              break;
+          }
+        }
+        in.close();
+        return null;
+      } catch (IOException e) {
+        throw new RuntimeException(e);
+      }
+    }
+  }
+
+  private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new 
HashMap();
+
+  public void addColumn(ColumnDescriptor descriptor, ColumnChunkMetaData 
metaData) throws IOException {
+    FSDataInputStream in = fs.open(path);
+    streams.add(in);
+    in.seek(metaData.getStartingPos());
+    ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, 
in);
+
+    columns.put(descriptor, reader);
+  }
+
+  public void close() throws IOException {
+    for (FSDataInputStream stream : streams) {
+      stream.close();
+    }
+  }
+
+  @Override
+  public PageReader getPageReader(ColumnDescriptor descriptor) {
+    return columns.get(descriptor);
+  }
+
+  @Override
+  public long getRowCount() {
+    return rowCount;
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java 
b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
new file mode 100644
index 0000000..af38d08
--- /dev/null
+++ b/exec/java-exec/src/test/java/org/apache/drill/ParquetSchemaMerge.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill;
+
+import parquet.schema.GroupType;
+import parquet.schema.MessageType;
+import parquet.schema.PrimitiveType;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.Type;
+import parquet.schema.Type.Repetition;
+
+public class ParquetSchemaMerge {
+  public static void main(String[] args) {
+    MessageType message1;
+    MessageType message2;
+
+    PrimitiveType c = new PrimitiveType(Repetition.OPTIONAL, 
PrimitiveTypeName.INT32, "c");
+    GroupType b = new GroupType(Repetition.REQUIRED, "b");
+    GroupType a = new GroupType(Repetition.OPTIONAL, "a", b);
+    message1 = new MessageType("root", a);
+
+    PrimitiveType c2 = new PrimitiveType(Repetition.OPTIONAL, 
PrimitiveTypeName.INT32, "d");
+    GroupType b2 = new GroupType(Repetition.OPTIONAL, "b", c2);
+    GroupType a2 = new GroupType(Repetition.OPTIONAL, "a", b2);
+    message2 = new MessageType("root", a2);
+
+    MessageType message3 = message1.union(message2);
+
+    StringBuilder builder = new StringBuilder();
+    message3.writeToStringBuilder(builder, "");
+    System.out.println(builder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java 
b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
index c2eba39..8ed8171 100644
--- a/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
+++ b/exec/java-exec/src/test/java/org/apache/drill/TestExampleQueries.java
@@ -24,6 +24,13 @@ import org.junit.Test;
 public class TestExampleQueries extends BaseTestQuery{
   static final org.slf4j.Logger logger = 
org.slf4j.LoggerFactory.getLogger(TestExampleQueries.class);
 
+  @Test
+  public void testParquetComplex() throws Exception {
+    test("select recipe from cp.`parquet/complex.parquet`");
+    test("select * from cp.`parquet/complex.parquet`");
+    test("select recipe, c.inventor.name as name, c.inventor.age as age from 
cp.`parquet/complex.parquet` c");
+  }
+  
   @Test // see DRILL-553
   public void testQueryWithNullValues() throws Exception {
     test("select count(*) from cp.`customer.json` limit 1");

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/0d6befca/exec/java-exec/src/test/resources/parquet/complex.parquet
----------------------------------------------------------------------
diff --git a/exec/java-exec/src/test/resources/parquet/complex.parquet 
b/exec/java-exec/src/test/resources/parquet/complex.parquet
new file mode 100644
index 0000000..5a8d5a2
Binary files /dev/null and 
b/exec/java-exec/src/test/resources/parquet/complex.parquet differ

Reply via email to