DRILL-1281: Use ByteBuffer read codepath in complex parquet reader
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c3b15af0 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c3b15af0 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c3b15af0 Branch: refs/heads/master Commit: c3b15af0292270e75a3f98a9225e7dceb24c96c3 Parents: 02e4824 Author: Steven Phillips <[email protected]> Authored: Mon Aug 11 11:58:43 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Mon Aug 18 14:46:43 2014 -0700 ---------------------------------------------------------------------- contrib/storage-hbase/pom.xml | 26 +++++++++- distribution/pom.xml | 13 ++--- distribution/src/assemble/bin.xml | 2 +- distribution/src/resources/drill-config.sh | 27 ----------- exec/java-exec/pom.xml | 50 ++++++++++++++++++++ .../exec/store/parquet2/DrillParquetReader.java | 11 ++++- .../parquet/hadoop/ColumnChunkIncReadStore.java | 46 ++++++++++++------ pom.xml | 4 +- 8 files changed, 125 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/contrib/storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/pom.xml b/contrib/storage-hbase/pom.xml index 82bf4bb..642bc34 100644 --- a/contrib/storage-hbase/pom.xml +++ b/contrib/storage-hbase/pom.xml @@ -180,6 +180,18 @@ <artifactId>asm</artifactId> <groupId>asm</groupId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> </exclusions> </dependency> <!-- test dependencies --> @@ -193,6 +205,18 @@ <artifactId>commons-logging</artifactId> <groupId>commons-logging</groupId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -232,7 +256,7 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> - <version>0.94.13-mapr-1401-m7-3.0.2</version> + <version>0.94.17-mapr-1405-m7-4.0.0-FCS</version> <exclusions> <exclusion> <artifactId>commons-logging</artifactId> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/pom.xml ---------------------------------------------------------------------- diff --git a/distribution/pom.xml b/distribution/pom.xml index 87738bb..1429902 100644 --- a/distribution/pom.xml +++ b/distribution/pom.xml @@ -65,6 +65,12 @@ <groupId>org.apache.drill.contrib</groupId> <artifactId>drill-storage-hbase</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.drill.contrib.storage-hive</groupId> @@ -72,14 +78,9 @@ <version>${project.version}</version> </dependency> <dependency> - <groupId>com.google.protobuf</groupId> - <artifactId>protobuf-java</artifactId> - <version>2.4.1</version> - </dependency> - <dependency> <groupId>de.huxhorn.lilith</groupId> <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId> - <version>0.9.42.1</version> + <version>0.9.44</version> <scope>test</scope> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/src/assemble/bin.xml ---------------------------------------------------------------------- diff --git a/distribution/src/assemble/bin.xml b/distribution/src/assemble/bin.xml index c1675e9..6d7fb6b 100644 --- a/distribution/src/assemble/bin.xml +++ b/distribution/src/assemble/bin.xml @@ -90,7 +90,7 @@ <unpack>false</unpack> <useProjectArtifact>false</useProjectArtifact> <includes> - <include>com.google.protobuf:protobuf-java:jar:2.4.1</include> + <include>com.google.protobuf:protobuf-java:jar:2.5</include> </includes> <scope>test</scope> </dependencySet> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/distribution/src/resources/drill-config.sh ---------------------------------------------------------------------- diff --git a/distribution/src/resources/drill-config.sh b/distribution/src/resources/drill-config.sh index 795e97a..621f947 100644 --- a/distribution/src/resources/drill-config.sh +++ b/distribution/src/resources/drill-config.sh @@ -88,33 +88,6 @@ if [ ! -d $DRILL_LOG_DIR ]; then mkdir -p $DRILL_LOG_DIR fi -# If HADOOP_HOME is specified, add all Hadoop jars except those -# specifically excluded in $DRILL_HOME/bin/hadoop-excludes.txt -if [ "${HADOOP_HOME}x" != "x" ] ; then - HADOOP_CLASSPATH="" - for jar in `ls $HADOOP_HOME/lib/*jar` ; do - echo $jar | grep -v -f $DRILL_HOME/bin/hadoop-excludes.txt >/dev/null - if [ "$?" -eq "0" ] ; then - HADOOP_CLASSPATH=$jar:$HADOOP_CLASSPATH - fi - done - HADOOP_CLASSPATH=$HADOOP_HOME/conf:$HADOOP_CLASSPATH -fi -if [ "${HBASE_HOME}x" != "x" ] -then - HBASE_CLASSPATH="" - for jar in `ls $HBASE_HOME/*jar` - do - echo $jar | grep -v -f $DRILL_HOME/bin/hadoop-excludes.txt >/dev/null - if [ "$?" -eq "0" ] - then - HBASE_CLASSPATH=$jar:$HBASE_CLASSPATH - fi - done - HBASE_CLASSPATH=$HBASE_HOME/conf:$HBASE_CLASSPATH - export HBASE_CLASSPATH=$HBASE_CLASSPATH -fi - # Add Drill conf folder at the beginning of the classpath CP=$DRILL_CONF_DIR http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 21dfc67..685f2fe 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -357,6 +357,10 @@ <groupId>com.sun.jersey</groupId> <artifactId>jersey-json</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> @@ -367,6 +371,16 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> + <exclusions> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>com.mapr.hadoop</groupId> @@ -393,6 +407,42 @@ <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-client</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-core</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-json</artifactId> + </exclusion> + <exclusion> + <groupId>com.sun.jersey</groupId> + <artifactId>jersey-server</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jetty</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>jsp-api-2.1</artifactId> + </exclusion> + <exclusion> + <groupId>org.mortbay.jetty</groupId> + <artifactId>servlet-api-2.5</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + </exclusion> </exclusions> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index 0d2a225..f47acab 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -46,6 +46,7 @@ import parquet.schema.GroupType; import parquet.schema.MessageType; import parquet.schema.Type; +import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; @@ -61,6 +62,7 @@ public class DrillParquetReader implements RecordReader { private RowGroupReadEntry entry; private List<SchemaPath> columns; private VectorContainerWriter writer; + private ColumnChunkIncReadStore pageReadStore; private parquet.io.RecordReader<Void> recordReader; private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; @@ -142,8 +144,8 @@ public class DrillParquetReader implements RecordReader { recordCount = (int) blockMetaData.getRowCount(); - ColumnChunkIncReadStore pageReadStore = new ColumnChunkIncReadStore(recordCount, - codecFactoryExposer.getCodecFactory(), fs, filePath); + pageReadStore = new ColumnChunkIncReadStore(recordCount, + codecFactoryExposer.getCodecFactory(), operatorContext.getAllocator(), fs, filePath); for (String[] path : schema.getPaths()) { Type type = schema.getType(path); @@ -203,6 +205,11 @@ public class DrillParquetReader implements RecordReader { @Override public void cleanup() { + try { + pageReadStore.close(); + } catch (IOException e) { + logger.warn("Failure while closing PageReadStore", e); + } } public void setOperatorContext(OperatorContext operatorContext) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java index e5a477b..379d3e6 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/ColumnChunkIncReadStore.java @@ -17,6 +17,8 @@ */ package parquet.hadoop; +import io.netty.buffer.ByteBuf; +import org.apache.drill.exec.memory.BufferAllocator; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; @@ -35,8 +37,10 @@ import parquet.format.converter.ParquetMetadataConverter; import parquet.hadoop.CodecFactory.BytesDecompressor; import parquet.hadoop.metadata.ColumnChunkMetaData; import parquet.hadoop.metadata.FileMetaData; +import parquet.hadoop.util.CompatibilityUtil; import java.io.IOException; +import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -48,13 +52,15 @@ public class ColumnChunkIncReadStore implements PageReadStore { private static ParquetMetadataConverter parquetMetadataConverter = new ParquetMetadataConverter(); private CodecFactory codecFactory = new CodecFactory(new Configuration()); + private BufferAllocator allocator; private FileSystem fs; private Path path; private long rowCount; private List<FSDataInputStream> streams = new ArrayList(); - public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, FileSystem fs, Path path) { + public ColumnChunkIncReadStore(long rowCount, CodecFactory codecFactory, BufferAllocator allocator, FileSystem fs, Path path) { this.codecFactory = codecFactory; + this.allocator = allocator; this.fs = fs; this.path = path; this.rowCount = rowCount; @@ -64,6 +70,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { public class ColumnChunkIncPageReader implements PageReader { ColumnChunkMetaData metaData; + ColumnDescriptor columnDescriptor; long fileOffset; long size; private long valueReadSoFar = 0; @@ -72,16 +79,11 @@ public class ColumnChunkIncReadStore implements PageReadStore { private FSDataInputStream in; private BytesDecompressor decompressor; - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in) { - this.metaData = metaData; - this.size = metaData.getTotalSize(); - this.fileOffset = metaData.getStartingPos(); - this.in = in; - this.decompressor = codecFactory.getDecompressor(metaData.getCodec()); - } + private ByteBuf lastPage; - public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, FSDataInputStream in, CodecFactory codecFactory) { + public ColumnChunkIncPageReader(ColumnChunkMetaData metaData, ColumnDescriptor columnDescriptor, FSDataInputStream in) { this.metaData = metaData; + this.columnDescriptor = columnDescriptor; this.size = metaData.getTotalSize(); this.fileOffset = metaData.getStartingPos(); this.in = in; @@ -104,13 +106,9 @@ public class ColumnChunkIncReadStore implements PageReadStore { pageHeader.getDictionary_page_header().getNum_values(), parquetMetadataConverter.getEncoding(pageHeader.dictionary_page_header.encoding) ); - System.out.println(dictionaryPage); } catch (IOException e) { throw new RuntimeException(e); } -// if (dictionaryPage == null) { -// throw new RuntimeException("Dictionary page null"); -// } } return dictionaryPage; } @@ -123,6 +121,10 @@ public class ColumnChunkIncReadStore implements PageReadStore { @Override public Page readPage() { try { + if (lastPage != null) { + lastPage.release(); + lastPage = null; + } while(valueReadSoFar < metaData.getValueCount()) { PageHeader pageHeader = Util.readPageHeader(in); switch (pageHeader.type) { @@ -140,10 +142,15 @@ public class ColumnChunkIncReadStore implements PageReadStore { break; case DATA_PAGE: valueReadSoFar += pageHeader.data_page_header.getNum_values(); + ByteBuf buf = allocator.buffer(pageHeader.compressed_page_size); + lastPage = buf; + ByteBuffer buffer = buf.nioBuffer(0, pageHeader.compressed_page_size); + CompatibilityUtil.getBuf(in, buffer, pageHeader.compressed_page_size); return new Page( - decompressor.decompress(BytesInput.from(in,pageHeader.compressed_page_size), pageHeader.getUncompressed_page_size()), + decompressor.decompress(BytesInput.from(buffer, 0, pageHeader.compressed_page_size), pageHeader.compressed_page_size), pageHeader.data_page_header.num_values, pageHeader.uncompressed_page_size, + parquetMetadataConverter.fromParquetStatistics(pageHeader.data_page_header.statistics, columnDescriptor.getType()), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.repetition_level_encoding), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.definition_level_encoding), parquetMetadataConverter.getEncoding(pageHeader.data_page_header.encoding) @@ -159,6 +166,12 @@ public class ColumnChunkIncReadStore implements PageReadStore { throw new RuntimeException(e); } } + + void close() { + if (lastPage != null) { + lastPage.release(); + } + } } private Map<ColumnDescriptor, ColumnChunkIncPageReader> columns = new HashMap(); @@ -167,7 +180,7 @@ public class ColumnChunkIncReadStore implements PageReadStore { FSDataInputStream in = fs.open(path); streams.add(in); in.seek(metaData.getStartingPos()); - ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, in); + ColumnChunkIncPageReader reader = new ColumnChunkIncPageReader(metaData, descriptor, in); columns.put(descriptor, reader); } @@ -176,6 +189,9 @@ public class ColumnChunkIncReadStore implements PageReadStore { for (FSDataInputStream stream : streams) { stream.close(); } + for (ColumnChunkIncPageReader reader : columns.values()) { + reader.close(); + } } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c3b15af0/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a984a4f..457ef3e 100644 --- a/pom.xml +++ b/pom.xml @@ -507,7 +507,7 @@ <dependency> <groupId>de.huxhorn.lilith</groupId> <artifactId>de.huxhorn.lilith.logback.appender.multiplex-classic</artifactId> - <version>0.9.43</version> + <version>0.9.44</version> <scope>test</scope> </dependency> @@ -804,7 +804,7 @@ <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> - <version>0.94.17-mapr-1405-4.0.0-FCS</version> + <version>0.94.17-mapr-1405-m7-4.0.0-FCS</version> <exclusions> <exclusion> <groupId>commons-logging</groupId>
