DRILL-1281: Use ByteBuffer read codepath in complex parquet reader

Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c3b15af0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c3b15af0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c3b15af0

Branch: refs/heads/master
Commit: c3b15af0292270e75a3f98a9225e7dceb24c96c3
Parents: 02e4824
Author: Steven Phillips <[email protected]>
Authored: Mon Aug 11 11:58:43 2014 -0700
Committer: Steven Phillips <[email protected]>
Committed: Mon Aug 18 14:46:43 2014 -0700

----------------------------------------------------------------------
 contrib/storage-hbase/pom.xml                   | 26 +++++++++-
 distribution/pom.xml                            | 13 ++---
 distribution/src/assemble/bin.xml               |  2 +-
 distribution/src/resources/drill-config.sh      | 27 -----------
 exec/java-exec/pom.xml                          | 50 ++++++++++++++++++++
 .../exec/store/parquet2/DrillParquetReader.java | 11 ++++-
 .../parquet/hadoop/ColumnChunkIncReadStore.java | 46 ++++++++++++------
 pom.xml                                         |  4 +-
 8 files changed, 125 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/contrib/storage-hbase/pom.xml
----------------------------------------------------------------------
diff --git a/contrib/storage-hbase/pom.xml b/contrib/storage-hbase/pom.xml
index 82bf4bb..642bc34 100644
--- a/contrib/storage-hbase/pom.xml
+++ b/contrib/storage-hbase/pom.xml
@@ -180,6 +180,18 @@
               <artifactId>asm</artifactId>
               <groupId>asm</groupId>
             </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-json</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-server</artifactId>
+            </exclusion>
           </exclusions>
         </dependency>
         <!-- test dependencies -->
@@ -193,6 +205,18 @@
               <artifactId>commons-logging</artifactId>
               <groupId>commons-logging</groupId>
             </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-json</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-server</artifactId>
+            </exclusion>
           </exclusions>
         </dependency>
         <dependency>
@@ -232,7 +256,7 @@
         <dependency>
           <groupId>org.apache.hbase</groupId>
           <artifactId>hbase</artifactId>
-          <version>0.94.13-mapr-1401-m7-3.0.2</version>
+          <version>0.94.17-mapr-1405-m7-4.0.0-FCS</version>
           <exclusions>
             <exclusion>
               <artifactId>commons-logging</artifactId>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/pom.xml
----------------------------------------------------------------------
diff --git a/distribution/pom.xml b/distribution/pom.xml
index 87738bb..1429902 100644
--- a/distribution/pom.xml
+++ b/distribution/pom.xml
@@ -65,6 +65,12 @@
       <groupId>org.apache.drill.contrib</groupId>
       <artifactId>drill-storage-hbase</artifactId>
       <version>${project.version}</version>
+      <exclusions>
+        <exclusion>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-core</artifactId>
+        </exclusion>
+      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.drill.contrib.storage-hive</groupId>
@@ -72,14 +78,9 @@
       <version>${project.version}</version>
     </dependency>
     <dependency>
-      <groupId>com.google.protobuf</groupId>
-      <artifactId>protobuf-java</artifactId>
-      <version>2.4.1</version>
-    </dependency>
-    <dependency>
       <groupId>de.huxhorn.lilith</groupId>
       
<artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
-      <version>0.9.42.1</version>
+      <version>0.9.44</version>
       <scope>test</scope>
     </dependency>
   </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/src/assemble/bin.xml
----------------------------------------------------------------------
diff --git a/distribution/src/assemble/bin.xml 
b/distribution/src/assemble/bin.xml
index c1675e9..6d7fb6b 100644
--- a/distribution/src/assemble/bin.xml
+++ b/distribution/src/assemble/bin.xml
@@ -90,7 +90,7 @@
       <unpack>false</unpack>
       <useProjectArtifact>false</useProjectArtifact>
       <includes>
-        <include>com.google.protobuf:protobuf-java:jar:2.4.1</include>
+        <include>com.google.protobuf:protobuf-java:jar:2.5</include>
       </includes>
       <scope>test</scope>
     </dependencySet>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/src/resources/drill-config.sh
----------------------------------------------------------------------
diff --git a/distribution/src/resources/drill-config.sh 
b/distribution/src/resources/drill-config.sh
index 795e97a..621f947 100644
--- a/distribution/src/resources/drill-config.sh
+++ b/distribution/src/resources/drill-config.sh
@@ -88,33 +88,6 @@ if [ ! -d $DRILL_LOG_DIR ]; then
   mkdir -p $DRILL_LOG_DIR
 fi
 
-# If HADOOP_HOME is specified, add all Hadoop jars except those
-# specifically excluded in $DRILL_HOME/bin/hadoop-excludes.txt
-if [ "${HADOOP_HOME}x" != "x" ] ; then
-  HADOOP_CLASSPATH=""
-  for jar in `ls $HADOOP_HOME/lib/*jar` ; do
-    echo $jar | grep -v -f $DRILL_HOME/bin/hadoop-excludes.txt >/dev/null
-    if [ "$?" -eq "0" ] ; then
-      HADOOP_CLASSPATH=$jar:$HADOOP_CLASSPATH
-    fi
-  done
-  HADOOP_CLASSPATH=$HADOOP_HOME/conf:$HADOOP_CLASSPATH
-fi
-if [ "${HBASE_HOME}x" != "x" ]
-then
-  HBASE_CLASSPATH=""
-  for jar in `ls $HBASE_HOME/*jar`
-  do
-    echo $jar | grep -v -f $DRILL_HOME/bin/hadoop-excludes.txt >/dev/null
-    if [ "$?" -eq "0" ]
-    then
-      HBASE_CLASSPATH=$jar:$HBASE_CLASSPATH
-    fi
-  done
-  HBASE_CLASSPATH=$HBASE_HOME/conf:$HBASE_CLASSPATH
-  export HBASE_CLASSPATH=$HBASE_CLASSPATH
-fi
-
 # Add Drill conf folder at the beginning of the classpath
 CP=$DRILL_CONF_DIR
 

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/pom.xml
----------------------------------------------------------------------
diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml
index 21dfc67..685f2fe 100644
--- a/exec/java-exec/pom.xml
+++ b/exec/java-exec/pom.xml
@@ -357,6 +357,10 @@
               <groupId>com.sun.jersey</groupId>
               <artifactId>jersey-json</artifactId>
             </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-client</artifactId>
+            </exclusion>
           </exclusions>
         </dependency>
       </dependencies>
@@ -367,6 +371,16 @@
         <dependency>
           <groupId>org.apache.hadoop</groupId>
           <artifactId>hadoop-client</artifactId>
+          <exclusions>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-client</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+          </exclusions>
         </dependency>
         <dependency>
           <groupId>com.mapr.hadoop</groupId>
@@ -393,6 +407,42 @@
               <groupId>org.slf4j</groupId>
               <artifactId>slf4j-log4j12</artifactId>
             </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-client</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-core</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-json</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>com.sun.jersey</groupId>
+              <artifactId>jersey-server</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jetty</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jsp-2.1</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>jsp-api-2.1</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.mortbay.jetty</groupId>
+              <artifactId>servlet-api-2.5</artifactId>
+            </exclusion>
+            <exclusion>
+              <groupId>org.apache.hadoop</groupId>
+              <artifactId>hadoop-core</artifactId>
+            </exclusion>
           </exclusions>
         </dependency>
       </dependencies>

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
index 0d2a225..f47acab 100644
--- 
a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
+++ 
b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java
@@ -46,6 +46,7 @@ import parquet.schema.GroupType;
 import parquet.schema.MessageType;
 import parquet.schema.Type;
 
+import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
@@ -61,6 +62,7 @@ public class DrillParquetReader implements RecordReader {
   private RowGroupReadEntry entry;
   private List<SchemaPath> columns;
   private VectorContainerWriter writer;
+  private ColumnChunkIncReadStore pageReadStore;
   private parquet.io.RecordReader<Void> recordReader;
   private DrillParquetRecordMaterializer recordMaterializer;
   private int recordCount;
@@ -142,8 +144,8 @@ public class DrillParquetReader implements RecordReader {
 
       recordCount = (int) blockMetaData.getRowCount();
 
-      ColumnChunkIncReadStore pageReadStore = new 
ColumnChunkIncReadStore(recordCount,
-              codecFactoryExposer.getCodecFactory(), fs, filePath);
+      pageReadStore = new ColumnChunkIncReadStore(recordCount,
+              codecFactoryExposer.getCodecFactory(), 
operatorContext.getAllocator(), fs, filePath);
 
       for (String[] path : schema.getPaths()) {
         Type type = schema.getType(path);
@@ -203,6 +205,11 @@ public class DrillParquetReader implements RecordReader {
 
   @Override
   public void cleanup() {
+    try {
+      pageReadStore.close();
+    } catch (IOException e) {
+      logger.warn("Failure while closing PageReadStore", e);
+    }
   }
 
   public void setOperatorContext(OperatorContext operatorContext) {

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
----------------------------------------------------------------------
diff --git 
a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java 
b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
index e5a477b..379d3e6 100644
--- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
+++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java
@@ -17,6 +17,8 @@
  */
 package parquet.hadoop;
 
+import io.netty.buffer.ByteBuf;
+import org.apache.drill.exec.memory.BufferAllocator;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +37,10 @@ import parquet.format.converter.ParquetMetadataConverter;
 import parquet.hadoop.CodecFactory.BytesDecompressor;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.FileMetaData;
+import parquet.hadoop.util.CompatibilityUtil;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
@@ -48,13 +52,15 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
   private static ParquetMetadataConverter parquetMetadataConverter = new 
ParquetMetadataConverter();
 
   private CodecFactory codecFactory = new CodecFactory(new Configuration());
+  private BufferAllocator allocator;
   private FileSystem fs;
   private Path path;
   private long rowCount;
   private List<FSDataInputStream> streams = new ArrayList();
 
-  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, 
FileSystem fs, Path path) {
+  public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, 
BufferAllocator allocator, FileSystem fs, Path path) {
     this.codecFactory = codecFactory;
+    this.allocator = allocator;
     this.fs = fs;
     this.path = path;
     this.rowCount = rowCount;
@@ -64,6 +70,7 @@ public class ColumnChunkIncReadStore implements PageReadStore 
{
   public class ColumnChunkIncPageReader implements PageReader {
 
     ColumnChunkMetaData metaData;
+    ColumnDescriptor columnDescriptor;
     long fileOffset;
     long size;
     private long valueReadSoFar = 0;
@@ -72,16 +79,11 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
     private FSDataInputStream in;
     private BytesDecompressor decompressor;
 
-    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, 
FSDataInputStream in) {
-      this.metaData = metaData;
-      this.size = metaData.getTotalSize();
-      this.fileOffset = metaData.getStartingPos();
-      this.in = in;
-      this.decompressor = codecFactory.getDecompressor(metaData.getCodec());
-    }
+    private ByteBuf lastPage;
 
-    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, 
FSDataInputStream in, CodecFactory codecFactory) {
+    public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, 
ColumnDescriptor columnDescriptor, FSDataInputStream in) {
       this.metaData = metaData;
+      this.columnDescriptor = columnDescriptor;
       this.size = metaData.getTotalSize();
       this.fileOffset = metaData.getStartingPos();
       this.in = in;
@@ -104,13 +106,9 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
                           
pageHeader.getDictionary_page_header().getNum_values(),
                           
parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding)
                   );
-          System.out.println(dictionaryPage);
         } catch (IOException e) {
           throw new RuntimeException(e);
         }
-//        if (dictionaryPage == null) {
-//          throw new RuntimeException("Dictionary page null");
-//        }
       }
       return dictionaryPage;
     }
@@ -123,6 +121,10 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
     @Override
     public Page readPage() {
       try {
+        if (lastPage != null) {
+          lastPage.release();
+          lastPage = null;
+        }
         while(valueReadSoFar < metaData.getValueCount()) {
           PageHeader pageHeader = Util.readPageHeader(in);
           switch (pageHeader.type) {
@@ -140,10 +142,15 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
               break;
             case DATA_PAGE:
               valueReadSoFar += pageHeader.data_page_header.getNum_values();
+              ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size);
+              lastPage = buf;
+              ByteBuffer buffer = buf.nioBuffer(0, 
pageHeader.compressed_page_size);
+              CompatibilityUtil.getBuf(in, buffer, 
pageHeader.compressed_page_size);
               return new Page(
-                      
decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), 
pageHeader.getUncompressed_page_size()),
+                      decompressor.decompress(BytesInput.from(buffer, 0, 
pageHeader.compressed_page_size), pageHeader.compressed_page_size),
                       pageHeader.data_page_header.num_values,
                       pageHeader.uncompressed_page_size,
+                      
parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics,
 columnDescriptor.getType()),
                       
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding),
                       
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding),
                       
parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding)
@@ -159,6 +166,12 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
         throw new RuntimeException(e);
       }
     }
+
+    void close() {
+      if (lastPage != null) {
+        lastPage.release();
+      }
+    }
   }
 
   private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new 
HashMap();
@@ -167,7 +180,7 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
     FSDataInputStream in = fs.open(path);
     streams.add(in);
     in.seek(metaData.getStartingPos());
-    ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, 
in);
+    ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, 
descriptor, in);
 
     columns.put(descriptor, reader);
   }
@@ -176,6 +189,9 @@ public class ColumnChunkIncReadStore implements 
PageReadStore {
     for (FSDataInputStream stream : streams) {
       stream.close();
     }
+    for (ColumnChunkIncPageReader reader : columns.values()) {
+      reader.close();
+    }
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index a984a4f..457ef3e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -507,7 +507,7 @@
     <dependency>
       <groupId>de.huxhorn.lilith</groupId>
       
<artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId>
-      <version>0.9.43</version>
+      <version>0.9.44</version>
       <scope>test</scope>
     </dependency>
 
@@ -804,7 +804,7 @@
           <dependency>
             <groupId>org.apache.hbase</groupId>
             <artifactId>hbase</artifactId>
-            <version>0.94.17-mapr-1405-4.0.0-FCS</version>
+            <version>0.94.17-mapr-1405-m7-4.0.0-FCS</version>
             <exclusions>
               <exclusion>
                 <groupId>commons-logging</groupId>

Reply via email to