DRILL-1281: Read into Direct Memory in Parquet Reader. Requires Hadoop 2.4 or above
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/02e4824e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/02e4824e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/02e4824e Branch: refs/heads/master Commit: 02e4824ed9da6e15b5f974f16b15b408695c8ada Parents: 654c879 Author: Parth Chandra <[email protected]> Authored: Mon Jul 28 13:44:52 2014 -0700 Committer: Steven Phillips <[email protected]> Committed: Mon Aug 18 14:46:26 2014 -0700 ---------------------------------------------------------------------- .../drill/common/util/DecimalUtility.java | 9 +- contrib/storage-hbase/pom.xml | 48 +++++ .../exec/store/hbase/HBaseRecordReader.java | 14 ++ .../drill/exec/store/hive/HiveRecordReader.java | 13 ++ exec/java-exec/pom.xml | 112 ++++++++++-- .../codegen/templates/HolderReaderImpl.java | 1 - .../drill/exec/physical/impl/ScanBatch.java | 6 +- .../exec/store/LocalSyncableFileSystem.java | 12 ++ .../apache/drill/exec/store/RecordReader.java | 9 + .../exec/store/easy/json/JSONRecordReader2.java | 14 ++ .../drill/exec/store/mock/MockRecordReader.java | 13 ++ .../exec/store/parquet/ColumnDataReader.java | 26 ++- .../store/parquet/ParquetScanBatchCreator.java | 8 +- .../store/parquet/columnreaders/BitReader.java | 11 +- .../parquet/columnreaders/ColumnReader.java | 8 +- .../columnreaders/FixedByteAlignedReader.java | 15 +- .../NullableFixedByteAlignedReaders.java | 33 ++-- .../NullableVarLengthValuesColumn.java | 5 +- .../store/parquet/columnreaders/PageReader.java | 127 ++++++++++--- .../columnreaders/ParquetRecordReader.java | 13 ++ .../columnreaders/VarLengthColumnReaders.java | 98 +++++++--- .../columnreaders/VarLengthValuesColumn.java | 5 +- .../exec/store/parquet2/DrillParquetReader.java | 9 + .../drill/exec/store/pojo/PojoRecordReader.java | 10 ++ .../exec/store/text/DrillTextRecordReader.java | 14 +- .../parquet/hadoop/CodecFactoryExposer.java | 96 +++++++++- .../physical/impl/writer/TestParquetWriter.java | 4 +- exec/jdbc/pom.xml | 14 +- pom.xml | 180 +++++++++++-------- 29 files changed, 747 insertions(+), 180 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java b/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java index 8311d82..85ba918 100644 --- a/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java +++ b/common/src/main/java/org/apache/drill/common/util/DecimalUtility.java @@ -553,13 +553,20 @@ public class DecimalUtility { return 0; } - public static BigDecimal getBigDecimalFromByteArray(byte[] bytes, int start, int length, int scale) { byte[] value = Arrays.copyOfRange(bytes, start, start + length); BigInteger unscaledValue = new BigInteger(value); return new BigDecimal(unscaledValue, scale); } + + public static BigDecimal getBigDecimalFromByteBuf(ByteBuf bytebuf, int start, int length, int scale) { + byte[] value = new byte[length]; + bytebuf.getBytes(start, value, 0, length); + BigInteger unscaledValue = new BigInteger(value); + return new BigDecimal(unscaledValue, scale); + } + public static void roundDecimal(ByteBuf result, int start, int nDecimalDigits, int desiredScale, int currentScale) { int newScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(desiredScale); int origScaleRoundedUp = org.apache.drill.common.util.DecimalUtility.roundUp(currentScale); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hbase/pom.xml ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/pom.xml b/contrib/storage-hbase/pom.xml index c6043a3..82bf4bb 100644 --- a/contrib/storage-hbase/pom.xml +++ b/contrib/storage-hbase/pom.xml @@ -37,6 +37,12 @@ <groupId>org.apache.drill.exec</groupId> <artifactId>drill-java-exec</artifactId> <version>${project.version}</version> + <exclusions> + <exclusion> + <artifactId>hadoop-client</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> </dependency> <!-- Test dependencies --> @@ -46,6 +52,17 @@ <classifier>tests</classifier> <version>${project.version}</version> <scope>test</scope> + <!-- here we explicitly exclude hadoop v2.x dependencies + because HBase 0.94.x unit tests do not work with hadoop 2 + test classes, e.g. MiniDFSCluster. + This will change once we move to HBase 0.98 or later. + --> + <exclusions> + <exclusion> + <artifactId>hadoop-client</artifactId> + <groupId>org.apache.hadoop</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.drill</groupId> @@ -146,10 +163,37 @@ </exclusions> </dependency> <dependency> + <!-- contrary to other Drill modules, this HBase storage module + compiles against hadoop 1.x to allow running a mini HBase + cluster to run the unit tests. + This will change once we move to HBase 0.98 or later. + --> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-core</artifactId> + <version>1.2.1</version> + <exclusions> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + <exclusion> + <artifactId>asm</artifactId> + <groupId>asm</groupId> + </exclusion> + </exclusions> + </dependency> + <!-- test dependencies --> + <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-test</artifactId> <version>1.2.1</version> <scope>test</scope> + <exclusions> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + </exclusions> </dependency> <dependency> <groupId>org.apache.hbase</groupId> @@ -230,6 +274,10 @@ <artifactId>jruby-complete</artifactId> <groupId>org.jruby</groupId> </exclusion> + <exclusion> + <artifactId>hadoop-core</artifactId> + <groupId>org.apache</groupId> + </exclusion> </exclusions> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java index 204d486..954cc5a 100644 --- a/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java +++ b/contrib/storage-hbase/src/main/java/org/apache/drill/exec/store/hbase/HBaseRecordReader.java @@ -33,6 +33,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; @@ -70,12 +71,16 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { private Scan hbaseScan; private Configuration hbaseConf; private Result leftOver; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; + public HBaseRecordReader(Configuration conf, HBaseSubScan.HBaseSubScanSpec subScanSpec, List<SchemaPath> projectedColumns, FragmentContext context) throws OutOfMemoryException { hbaseConf = conf; hbaseTable = subScanSpec.getTableName(); hbaseScan = new Scan(subScanSpec.getStartRow(), subScanSpec.getStopRow()); + fragmentContext=context; boolean rowKeyOnly = true; this.columns = Sets.newLinkedHashSet(); if (projectedColumns != null && projectedColumns.size() != 0) { @@ -117,6 +122,15 @@ public class HBaseRecordReader implements RecordReader, DrillHBaseConstants { hbaseScan.setCaching(TARGET_RECORD_COUNT); } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + + @Override public void setup(OutputMutator output) throws ExecutionSetupException { this.outputMutator = output; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index ac0fe36..36e55d9 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -34,6 +34,7 @@ import org.apache.drill.common.types.Types; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; @@ -101,6 +102,9 @@ public class HiveRecordReader implements RecordReader { protected Object redoRecord; protected boolean empty; private Map<String, String> hiveConfigOverride; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; + protected static final int TARGET_RECORD_COUNT = 4000; protected static final int FIELD_SIZE = 50; @@ -114,6 +118,7 @@ public class HiveRecordReader implements RecordReader { this.projectedColumns = projectedColumns; this.empty = (inputSplit == null && partition == null); this.hiveConfigOverride = hiveConfigOverride; + this.fragmentContext=context; init(); } @@ -229,6 +234,14 @@ public class HiveRecordReader implements RecordReader { } } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + @Override public void setup(OutputMutator output) throws ExecutionSetupException { try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/pom.xml ---------------------------------------------------------------------- diff --git a/exec/java-exec/pom.xml b/exec/java-exec/pom.xml index 5465f12..21dfc67 100644 --- a/exec/java-exec/pom.xml +++ b/exec/java-exec/pom.xml @@ -49,6 +49,11 @@ <version>2.1</version> </dependency> <dependency> + <groupId>org.apache.commons</groupId> + <artifactId>commons-math</artifactId> + <version>2.2</version> + </dependency> + <dependency> <groupId>com.thoughtworks.paranamer</groupId> <artifactId>paranamer</artifactId> <version>2.5.6</version> @@ -120,31 +125,106 @@ <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-column</artifactId> - <version>1.5.0-SNAPSHOT</version> + <version>1.5.1-drill-r2</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> + <artifactId>hadoop-client</artifactId> </exclusion> <exclusion> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-client</artifactId> + <artifactId>hadoop-common</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>com.twitter</groupId> <artifactId>parquet-hadoop</artifactId> - <version>1.5.0-SNAPSHOT</version> + <version>1.5.1-drill-r2</version> <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-format</artifactId> + <version>2.1.1-SNAPSHOT</version> + <exclusions> <exclusion> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-common</artifactId> + <version>1.5.1-drill-r2</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-jackson</artifactId> + <version>1.5.1-drill-r2</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-encoding</artifactId> + <version>1.5.1-drill-r2</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>com.twitter</groupId> + <artifactId>parquet-generator</artifactId> + <version>1.5.1-drill-r2</version> + <exclusions> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-client</artifactId> + </exclusion> + <exclusion> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> + </exclusion> </exclusions> </dependency> <dependency> @@ -267,7 +347,7 @@ <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> + <artifactId>hadoop-client</artifactId> <exclusions> <exclusion> <groupId>com.sun.jersey</groupId> @@ -279,7 +359,6 @@ </exclusion> </exclusions> </dependency> - </dependencies> </profile> <profile> @@ -287,17 +366,28 @@ <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> + <artifactId>hadoop-client</artifactId> </dependency> <dependency> <groupId>com.mapr.hadoop</groupId> - <artifactId>maprfs</artifactId> - <version>1.0.3-mapr-3.0.0</version> + <artifactId>maprfs</artifactId> + <version>2.4.1-mapr-4.0.0-SNAPSHOT</version> + <exclusions> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + </exclusions> + </dependency> + <dependency> + <groupId>org.json</groupId> + <artifactId>json</artifactId> + <version>20090211</version> </dependency> <dependency> <groupId>org.apache.hbase</groupId> <artifactId>hbase</artifactId> - <version>0.94.13-mapr-1401-m7-3.0.2</version> + <version>0.94.17-mapr-1405-4.0.0-FCS</version> <exclusions> <exclusion> <groupId>org.slf4j</groupId> http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java index 5f718a0..bd862a1 100644 --- a/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java +++ b/exec/java-exec/src/main/codegen/templates/HolderReaderImpl.java @@ -44,7 +44,6 @@ import java.math.BigInteger; import org.apache.drill.exec.expr.holders.*; import org.apache.hadoop.io.Text; import org.joda.time.Period; -import org.mortbay.jetty.servlet.Holder; @SuppressWarnings("unused") public class ${holderMode}${name}HolderReaderImpl extends AbstractFieldReader { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index eaf26d1..4b7e33e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -95,6 +95,7 @@ public class ScanBatch implements RecordBatch { throw new ExecutionSetupException("A scan batch must contain at least one reader."); this.currentReader = readers.next(); this.oContext = new OperatorContext(subScanConfig, context); + this.currentReader.setOperatorContext(this.oContext); this.currentReader.setup(mutator); this.partitionColumns = partitionColumns.iterator(); this.partitionValues = this.partitionColumns.hasNext() ? this.partitionColumns.next() : null; @@ -108,11 +109,14 @@ public class ScanBatch implements RecordBatch { this(subScanConfig, context, readers, Collections.<String[]> emptyList(), Collections.<Integer> emptyList()); } - @Override public FragmentContext getContext() { return context; } + public OperatorContext getOperatorContext() { + return oContext; + } + @Override public BatchSchema getSchema() { return schema; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java index 1e55c73..37b9c9d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/LocalSyncableFileSystem.java @@ -144,6 +144,18 @@ public class LocalSyncableFileSystem extends FileSystem { } @Override + public void hsync() throws IOException { + output.flush(); + fos.getFD().sync(); + } + + @Override + public void hflush() throws IOException { + output.flush(); + fos.getFD().sync(); + } + + @Override public void write(int b) throws IOException { output.write(b); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java index fb8a014..1745421 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/RecordReader.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.store; import org.apache.drill.common.exceptions.ExecutionSetupException; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; public interface RecordReader { @@ -36,6 +37,14 @@ public interface RecordReader { public abstract void setup(OutputMutator output) throws ExecutionSetupException; /** + * Set the operator context. The Reader can use this to access the operator context and allocate direct memory + * if needed + * @param operatorContext + */ + public abstract void setOperatorContext(OperatorContext operatorContext); + + + /** * Increment record reader forward, writing into the provided output batch. * * @return The number of additional records added to the output. http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index 3e2c81c..cd3cfdc 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -28,6 +28,7 @@ import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.vector.complex.fn.JsonReader; @@ -50,11 +51,15 @@ public class JSONRecordReader2 implements RecordReader{ private InputStream stream; private JsonReaderWithState jsonReader; private int recordCount; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; + public JSONRecordReader2(FragmentContext fragmentContext, String inputPath, FileSystem fileSystem, List<SchemaPath> columns) throws OutOfMemoryException { this.hadoopPath = new Path(inputPath); this.fileSystem = fileSystem; + this.fragmentContext=fragmentContext; } @Override @@ -80,6 +85,15 @@ public class JSONRecordReader2 implements RecordReader{ throw new DrillRuntimeException(sb.toString(), e); } + + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + @Override public int next() { writer.allocate(); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java index c7fc939..81505ae 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/mock/MockRecordReader.java @@ -27,6 +27,7 @@ import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; @@ -47,10 +48,14 @@ public class MockRecordReader implements RecordReader { private ValueVector[] valueVectors; private int recordsRead; private int batchRecordCount; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; + public MockRecordReader(FragmentContext context, MockScanEntry config) throws OutOfMemoryException { this.context = context; this.config = config; + this.fragmentContext=context; } private int getEstimatedRecordSize(MockColumn[] types) { @@ -69,6 +74,14 @@ public class MockRecordReader implements RecordReader { } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + @Override public void setup(OutputMutator output) throws ExecutionSetupException { try { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java index 80fbd80..2567c89 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ColumnDataReader.java @@ -17,9 +17,16 @@ */ package org.apache.drill.exec.store.parquet; +import java.io.EOFException; import java.io.IOException; import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -27,6 +34,7 @@ import org.apache.hadoop.fs.Path; import parquet.bytes.BytesInput; import parquet.format.PageHeader; import parquet.format.Util; +import parquet.hadoop.util.CompatibilityUtil; public class ColumnDataReader { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(ColumnDataReader.class); @@ -43,13 +51,26 @@ public class ColumnDataReader { public PageHeader readPageHeader() throws IOException{ return Util.readPageHeader(input); } - + public BytesInput getPageAsBytesInput(int pageLength) throws IOException{ byte[] b = new byte[pageLength]; input.read(b); return new HadoopBytesInput(b); } - + + public ByteBuf getPageAsBytesBuf(ByteBuf byteBuf, int pageLength) throws IOException{ + ByteBuffer directBuffer=byteBuf.nioBuffer(0, pageLength); + int l=directBuffer.remaining(); + int bl=byteBuf.capacity(); + try{ + CompatibilityUtil.getBuf(input, directBuffer, pageLength); + }catch(Exception e) { + logger.error("Failed to read data into Direct ByteBuffer with exception: "+e.getMessage()); + throw new DrillRuntimeException(e.getMessage()); + } + return byteBuf; + } + public void clear(){ try{ input.close(); @@ -87,4 +108,5 @@ public class ColumnDataReader { } } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java index 5edd3c5..a453e66 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/ParquetScanBatchCreator.java @@ -143,7 +143,13 @@ public class ParquetScanBatchCreator implements BatchCreator<ParquetRowGroupScan } } - return new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns); + ScanBatch s = new ScanBatch(rowGroupScan, context, readers.iterator(), partitionColumns, selectedPartitionColumns); + + for(RecordReader r : readers){ + r.setOperatorContext(s.getOperatorContext()); + } + + return s; } private static boolean isComplex(ParquetMetadata footer) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java index 2c6e488..43e270a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/BitReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.BaseDataValueVector; import org.apache.drill.exec.vector.ValueVector; @@ -28,7 +29,7 @@ final class BitReader extends ColumnReader { private byte currentByte; private byte nextByte; - private byte[] bytes; + private ByteBuf bytebuf; BitReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -45,15 +46,15 @@ final class BitReader extends ColumnReader { readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; readLength = (int) Math.ceil(readLengthInBits / 8.0); - bytes = pageReader.pageDataByteArray; + bytebuf = pageReader.pageDataByteArray; // standard read, using memory mapping if (pageReader.bitShift == 0) { - ((BaseDataValueVector) valueVec).getData().writeBytes(bytes, + ((BaseDataValueVector) valueVec).getData().writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); } else { // read in individual values, because a bitshift is necessary with where the last page or batch ended vectorData = ((BaseDataValueVector) valueVec).getData(); - nextByte = bytes[(int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)]; + nextByte = bytebuf.getByte((int) Math.max(0, Math.ceil(pageReader.valuesRead / 8.0) - 1)); readLengthInBits = recordsReadInThisIteration + pageReader.bitShift; int i = 0; @@ -66,7 +67,7 @@ final class BitReader extends ColumnReader { // if we are not on the last byte if ((int) Math.ceil(pageReader.valuesRead / 8.0) + i < pageReader.byteLength) { // grab the next byte from the buffer, shift and mask it, and OR it with the leftover bits - nextByte = bytes[(int) Math.ceil(pageReader.valuesRead / 8.0) + i]; + nextByte = bytebuf.getByte((int) Math.ceil(pageReader.valuesRead / 8.0) + i); currentByte = (byte) (currentByte | nextByte << (8 - pageReader.bitShift) & ParquetRecordReader.endBitMasks[8 - pageReader.bitShift - 1]); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java index fd672d6..b240407 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ColumnReader.java @@ -100,10 +100,12 @@ public abstract class ColumnReader<V extends ValueVector> { public void processPages(long recordsToReadInThisPass) throws IOException { reset(); - do { - determineSize(recordsToReadInThisPass, 0); + if(recordsToReadInThisPass>0) { + do { + determineSize(recordsToReadInThisPass, 0); - } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null); + } while (valuesReadInCurrentPass < recordsToReadInThisPass && pageReader.currentPage != null); + } valueVec.getMutator().setValueCount(valuesReadInCurrentPass); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index 4513aaa..5c25e11 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; @@ -35,7 +36,7 @@ import java.math.BigDecimal; class FixedByteAlignedReader extends ColumnReader { - protected byte[] bytes; + protected ByteBuf bytebuf; FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, @@ -54,13 +55,13 @@ class FixedByteAlignedReader extends ColumnReader { readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; readLength = (int) Math.ceil(readLengthInBits / 8.0); - bytes = pageReader.pageDataByteArray; + bytebuf = pageReader.pageDataByteArray; // vectorData is assigned by the superclass read loop method writeData(); } protected void writeData() { - vectorData.writeBytes(bytes, + vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); } @@ -81,7 +82,7 @@ class FixedByteAlignedReader extends ColumnReader { } /** - * Reads from bytes, converts, and writes to buffer + * Reads from bytebuf, converts, and writes to buffer * @param start the index in bytes to start reading from * @param index the index of the ValueVector */ @@ -101,7 +102,7 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay( - NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytes, start) + NullableFixedByteAlignedReaders.NullableDateReader.readIntLittleEndian(bytebuf, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } } @@ -119,7 +120,7 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); } @@ -138,7 +139,7 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index c1575de..4d7f312 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -17,6 +17,7 @@ */ package org.apache.drill.exec.store.parquet.columnreaders; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.vector.NullableBigIntVector; @@ -41,7 +42,7 @@ import java.math.BigDecimal; public class NullableFixedByteAlignedReaders { static class NullableFixedByteAlignedReader extends NullableColumnReader { - protected byte[] bytes; + protected ByteBuf bytebuf; NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -57,16 +58,16 @@ public class NullableFixedByteAlignedReaders { this.readStartInBytes = pageReader.readPosInBytes; this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; this.readLength = (int) Math.ceil(readLengthInBits / 8.0); - this.bytes = pageReader.pageDataByteArray; + this.bytebuf = pageReader.pageDataByteArray; // fill in data. - vectorData.writeBytes(bytes, (int) readStartInBytes, (int) readLength); + vectorData.writeBytes(bytebuf, (int) readStartInBytes, (int) readLength); } } static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> { - private byte[] bytes; + private ByteBuf bytebuf; NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v, @@ -87,7 +88,7 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> { - private byte[] bytes; + private ByteBuf bytebuf; NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v, @@ -106,7 +107,7 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> { - private byte[] bytes; + private ByteBuf bytebuf; NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v, @@ -125,7 +126,7 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> { - private byte[] bytes; + private ByteBuf bytebuf; NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v, @@ -160,7 +161,7 @@ public class NullableFixedByteAlignedReaders { this.readStartInBytes = pageReader.readPosInBytes; this.readLengthInBits = recordsReadInThisIteration * dataTypeLengthInBits; this.readLength = (int) Math.ceil(readLengthInBits / 8.0); - this.bytes = pageReader.pageDataByteArray; + this.bytebuf = pageReader.pageDataByteArray; dataTypeLengthInBytes = (int) Math.ceil(dataTypeLengthInBits / 8.0); for (int i = 0; i < recordsReadInThisIteration; i++) { @@ -183,15 +184,15 @@ public class NullableFixedByteAlignedReaders { @Override void addNext(int start, int index) { - dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytes, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); + dateVector.getMutator().set(index, DateTimeUtils.fromJulianDay(readIntLittleEndian(bytebuf, start) - ParquetOutputRecordWriter.JULIAN_DAY_EPOC - 0.5)); } // copied out of parquet library, didn't want to deal with the uneeded throws statement they had declared - public static int readIntLittleEndian(byte[] in, int offset) { - int ch4 = in[offset] & 0xff; - int ch3 = in[offset + 1] & 0xff; - int ch2 = in[offset + 2] & 0xff; - int ch1 = in[offset + 3] & 0xff; + public static int readIntLittleEndian(ByteBuf in, int offset) { + int ch4 = in.getByte(offset) & 0xff; + int ch3 = in.getByte(offset + 1) & 0xff; + int ch2 = in.getByte(offset + 2) & 0xff; + int ch1 = in.getByte(offset + 3) & 0xff; return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0)); } @@ -210,7 +211,7 @@ public class NullableFixedByteAlignedReaders { @Override void addNext(int start, int index) { int width = NullableDecimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits); } @@ -229,7 +230,7 @@ public class NullableFixedByteAlignedReaders { @Override void addNext(int start, int index) { int width = NullableDecimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index 2be9a37..83812ea 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -17,6 +17,7 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; import parquet.bytes.BytesUtils; @@ -37,7 +38,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } - public abstract boolean setSafe(int index, byte[] value, int start, int length); + public abstract boolean setSafe(int index, ByteBuf value, int start, int length); public abstract int capacity(); @@ -81,7 +82,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten } else { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray, + dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(), (int) pageReader.readyToReadPosInBytes); } // I think this also needs to happen if it is null for the random access http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 1d300bb..1687c3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -18,7 +18,12 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.store.parquet.ColumnDataReader; import org.apache.drill.exec.store.parquet.ParquetFormatPlugin; @@ -37,6 +42,7 @@ import parquet.format.PageHeader; import parquet.format.PageType; import parquet.format.Util; import parquet.hadoop.metadata.ColumnChunkMetaData; +import parquet.hadoop.metadata.CompressionCodecName; import parquet.schema.PrimitiveType; // class to keep track of the read position of variable length columns @@ -48,7 +54,7 @@ final class PageReader { // store references to the pages that have been uncompressed, but not copied to ValueVectors yet Page currentPage; // buffer to store bytes of current page - byte[] pageDataByteArray; + ByteBuf pageDataByteArray; // for variable length data we need to keep track of our current position in the page data // as the values and lengths are intermixed, making random access to the length data impossible @@ -81,8 +87,12 @@ final class PageReader { Dictionary dictionary; PageHeader pageHeader = null; - PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) throws ExecutionSetupException{ + List<ByteBuf> allocatedBuffers; + + PageReader(ColumnReader parentStatus, FileSystem fs, Path path, ColumnChunkMetaData columnChunkMetaData) + throws ExecutionSetupException{ this.parentColumnReader = parentStatus; + allocatedBuffers = new ArrayList<ByteBuf>(); long totalByteLength = columnChunkMetaData.getTotalUncompressedSize(); long start = columnChunkMetaData.getFirstDataPageOffset(); @@ -93,11 +103,25 @@ final class PageReader { f.seek(columnChunkMetaData.getDictionaryPageOffset()); PageHeader pageHeader = Util.readPageHeader(f); assert pageHeader.type == PageType.DICTIONARY_PAGE; - BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() - .decompress( // - dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // - pageHeader.getUncompressed_page_size(), // - parentColumnReader.columnChunkMetaData.getCodec()); + + BytesInput bytesIn; + ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); + allocatedBuffers.add(uncompressedData); + if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) { + dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); + bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, + pageHeader.getUncompressed_page_size()); + }else{ + ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size); + dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size); + bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress(parentColumnReader.columnChunkMetaData.getCodec(), + compressedData, + uncompressedData, + pageHeader.compressed_page_size, + pageHeader.getUncompressed_page_size()); + compressedData.release(); + } DictionaryPage page = new DictionaryPage( bytesIn, pageHeader.uncompressed_page_size, @@ -107,7 +131,8 @@ final class PageReader { this.dictionary = page.getEncoding().initDictionary(parentStatus.columnDescriptor, page); } } catch (IOException e) { - throw new ExecutionSetupException("Error opening or reading metatdata for parquet file at location: " + path.getName(), e); + throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: " + + path.getName(), e); } } @@ -137,11 +162,26 @@ final class PageReader { do { pageHeader = dataReader.readPageHeader(); if (pageHeader.getType() == PageType.DICTIONARY_PAGE) { - BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() - .decompress( // - dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // - pageHeader.getUncompressed_page_size(), // - parentColumnReader.columnChunkMetaData.getCodec()); + + //TODO: Handle buffer allocation exception + BytesInput bytesIn; + ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); + allocatedBuffers.add(uncompressedData); + if( parentColumnReader.columnChunkMetaData.getCodec()== CompressionCodecName.UNCOMPRESSED) { + dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); + bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, + pageHeader.getUncompressed_page_size()); + }else{ + ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size); + dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size); + bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress(parentColumnReader.columnChunkMetaData.getCodec(), + compressedData, + uncompressedData, + pageHeader.compressed_page_size, + pageHeader.getUncompressed_page_size()); + compressedData.release(); + } DictionaryPage page = new DictionaryPage( bytesIn, pageHeader.uncompressed_page_size, @@ -152,11 +192,25 @@ final class PageReader { } } while (pageHeader.getType() == PageType.DICTIONARY_PAGE); - BytesInput bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() - .decompress( // - dataReader.getPageAsBytesInput(pageHeader.compressed_page_size), // - pageHeader.getUncompressed_page_size(), // - parentColumnReader.columnChunkMetaData.getCodec()); + //TODO: Handle buffer allocation exception + BytesInput bytesIn; + ByteBuf uncompressedData=allocateBuffer(pageHeader.getUncompressed_page_size()); + allocatedBuffers.add(uncompressedData); + if(parentColumnReader.columnChunkMetaData.getCodec()==CompressionCodecName.UNCOMPRESSED) { + dataReader.getPageAsBytesBuf(uncompressedData, pageHeader.compressed_page_size); + bytesIn=parentColumnReader.parentReader.getCodecFactoryExposer().getBytesInput(uncompressedData, + pageHeader.getUncompressed_page_size()); + }else{ + ByteBuf compressedData=allocateBuffer(pageHeader.compressed_page_size); + dataReader.getPageAsBytesBuf(compressedData, pageHeader.compressed_page_size); + bytesIn = parentColumnReader.parentReader.getCodecFactoryExposer() + .decompress(parentColumnReader.columnChunkMetaData.getCodec(), + compressedData, + uncompressedData, + pageHeader.compressed_page_size, + pageHeader.getUncompressed_page_size()); + compressedData.release(); + } currentPage = new Page( bytesIn, pageHeader.data_page_header.num_values, @@ -172,12 +226,12 @@ final class PageReader { return false; } - pageDataByteArray = currentPage.getBytes().toByteArray(); + pageDataByteArray = Unpooled.wrappedBuffer(currentPage.getBytes().toByteBuffer()); readPosInBytes = 0; if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { repetitionLevels = currentPage.getRlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.REPETITION_LEVEL); - repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + repetitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); // we know that the first value will be a 0, at the end of each list of repeated values we will hit another 0 indicating // a new record, although we don't know the length until we hit it (and this is a one way stream of integers) so we // read the first zero here to simplify the reading processes, and start reading the first value the same as all @@ -191,23 +245,23 @@ final class PageReader { if (!currentPage.getValueEncoding().usesDictionary()) { parentColumnReader.usingDictionary = false; definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); readPosInBytes = definitionLevels.getNextOffset(); if (parentColumnReader.columnDescriptor.getType() == PrimitiveType.PrimitiveTypeName.BOOLEAN) { valueReader = currentPage.getValueEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.VALUES); - valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + valueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); } } else { parentColumnReader.usingDictionary = true; definitionLevels = currentPage.getDlEncoding().getValuesReader(parentColumnReader.columnDescriptor, ValuesType.DEFINITION_LEVEL); - definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + definitionLevels.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); readPosInBytes = definitionLevels.getNextOffset(); // initialize two of the dictionary readers, one is for determining the lengths of each value, the second is for // actually copying the values out into the vectors dictionaryLengthDeterminingReader = new DictionaryValuesReader(dictionary); - dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + dictionaryLengthDeterminingReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); dictionaryValueReader = new DictionaryValuesReader(dictionary); - dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray, (int) readPosInBytes); + dictionaryValueReader.initFromPage(currentPage.getValueCount(), pageDataByteArray.nioBuffer(), (int) readPosInBytes); this.parentColumnReader.usingDictionary = true; } } @@ -221,5 +275,28 @@ final class PageReader { public void clear(){ this.dataReader.clear(); + // Free all memory, including fixed length types. (Data is being copied for all types not just var length types) + //if(!this.parentColumnReader.isFixedLength) { + for (ByteBuf b : allocatedBuffers) { + b.release(); + } + //} } + + /* + Allocate direct memory to read data into + */ + private ByteBuf allocateBuffer(int size) { + ByteBuf b; + try { + b = parentColumnReader.parentReader.getOperatorContext().getAllocator().buffer(size); + //b = UnpooledByteBufAllocator.DEFAULT.heapBuffer(size); + }catch(Exception e){ + throw new DrillRuntimeException("Unable to allocate "+size+" bytes of memory in the Parquet Reader."+ + "[Exception: "+e.getMessage()+"]" + ); + } + return b; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java index 2228787..4e9ac81 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/ParquetRecordReader.java @@ -31,6 +31,7 @@ import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; @@ -82,6 +83,8 @@ public class ParquetRecordReader implements RecordReader { private VarLenBinaryReader varLengthReader; private ParquetMetadata footer; private List<SchemaPath> columns; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; private final CodecFactoryExposer codecFactoryExposer; int rowGroupIndex; @@ -107,6 +110,8 @@ public class ParquetRecordReader implements RecordReader { this.batchSize = batchSize; this.footer = footer; this.columns = columns; + this.fragmentContext=fragmentContext; + } public CodecFactoryExposer getCodecFactoryExposer() { @@ -133,6 +138,14 @@ public class ParquetRecordReader implements RecordReader { return batchSize; } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + /** * @param type a fixed length type from the parquet library enum * @return the length in pageDataByteArray of the type http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index 979e8c3..555cb94 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -19,8 +19,14 @@ package org.apache.drill.exec.store.parquet.columnreaders; import java.math.BigDecimal; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; +import org.apache.drill.exec.expr.holders.VarBinaryHolder; +import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; import org.apache.drill.exec.vector.Decimal28SparseVector; @@ -50,9 +56,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= decimal28Vector.getValueCapacity()) { return false; } @@ -79,9 +85,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= nullableDecimal28Vector.getValueCapacity()) { return false; } @@ -109,9 +115,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= decimal28Vector.getValueCapacity()) { return false; } @@ -138,9 +144,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteArray(bytes, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= nullableDecimal38Vector.getValueCapacity()) { return false; } @@ -170,16 +176,26 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { boolean success; if(index >= varCharVector.getValueCapacity()) return false; if (usingDictionary) { - success = varCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), - 0, currDictValToWrite.length()); + ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + int st=0; + int len=currDictVal.length(); + VarCharHolder holder = new VarCharHolder(); + holder.buffer=b; + holder.start=0; + holder.end=currDictVal.length(); + success = varCharVector.getMutator().setSafe(index, holder); } else { - success = varCharVector.getMutator().setSafe(index, bytes, start, length); + VarCharHolder holder = new VarCharHolder(); + holder.buffer=bytebuf; + holder.start=start; + holder.end=start+length; + success = varCharVector.getMutator().setSafe(index, holder); } return success; } @@ -204,16 +220,28 @@ public class VarLengthColumnReaders { nullableVarCharVector = v; } - public boolean setSafe(int index, byte[] value, int start, int length) { + public boolean setSafe(int index, ByteBuf value, int start, int length) { boolean success; if(index >= nullableVarCharVector.getValueCapacity()) return false; if (usingDictionary) { - success = nullableVarCharVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), - 0, currDictValToWrite.length()); + ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + int st=0; + int len=currDictVal.length(); + NullableVarCharHolder holder = new NullableVarCharHolder(); + holder.buffer=b; + holder.start=0; + holder.end=currDictVal.length(); + success = nullableVarCharVector.getMutator().setSafe(index, holder); + holder.isSet=1; } else { - success = nullableVarCharVector.getMutator().setSafe(index, value, start, length); + NullableVarCharHolder holder = new NullableVarCharHolder(); + holder.buffer=value; + holder.start=start; + holder.end=start+length; + holder.isSet=1; + success = nullableVarCharVector.getMutator().setSafe(index, holder); } return success; } @@ -237,16 +265,26 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, byte[] bytes, int start, int length) { + public boolean setSafe(int index, ByteBuf value, int start, int length) { boolean success; if(index >= varBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - success = varBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), - 0, currDictValToWrite.length()); + ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + int st=0; + int len=currDictVal.length(); + VarBinaryHolder holder = new VarBinaryHolder(); + holder.buffer=b; + holder.start=0; + holder.end=currDictVal.length(); + success = varBinaryVector.getMutator().setSafe(index, holder); } else { - success = varBinaryVector.getMutator().setSafe(index, bytes, start, length); + VarBinaryHolder holder = new VarBinaryHolder(); + holder.buffer=value; + holder.start=start; + holder.end=start+length; + success = varBinaryVector.getMutator().setSafe(index, holder); } return success; } @@ -271,16 +309,28 @@ public class VarLengthColumnReaders { nullableVarBinaryVector = v; } - public boolean setSafe(int index, byte[] value, int start, int length) { + public boolean setSafe(int index, ByteBuf value, int start, int length) { boolean success; if(index >= nullableVarBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - success = nullableVarBinaryVector.getMutator().setSafe(index, currDictValToWrite.getBytes(), - 0, currDictValToWrite.length()); + ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + int st=0; + int len=currDictVal.length(); + NullableVarBinaryHolder holder = new NullableVarBinaryHolder(); + holder.buffer=b; + holder.start=0; + holder.end=currDictVal.length(); + holder.isSet=1; + success = nullableVarBinaryVector.getMutator().setSafe(index, holder); } else { - success = nullableVarBinaryVector.getMutator().setSafe(index, value, start, length); + NullableVarBinaryHolder holder = new NullableVarBinaryHolder(); + holder.buffer=value; + holder.start=start; + holder.end=start+length; + holder.isSet=1; + success = nullableVarBinaryVector.getMutator().setSafe(index, holder); } return success; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 092c186..7f16c64 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -17,6 +17,7 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; +import io.netty.buffer.ByteBuf; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; @@ -48,7 +49,7 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe } } - public abstract boolean setSafe(int index, byte[] bytes, int start, int length); + public abstract boolean setSafe(int index, ByteBuf bytes, int start, int length); @Override protected void readField(long recordToRead) { @@ -79,7 +80,7 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe protected boolean readAndStoreValueSizeInformation() throws IOException { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division try { - dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray, + dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(), (int) pageReader.readyToReadPosInBytes); } catch (Throwable t) { throw t; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index aaeb536..0d2a225 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.common.expression.PathSegment; import org.apache.drill.common.expression.SchemaPath; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.parquet.RowGroupReadEntry; @@ -64,6 +65,8 @@ public class DrillParquetReader implements RecordReader { private DrillParquetRecordMaterializer recordMaterializer; private int recordCount; private List<ValueVector> primitiveVectors; + private OperatorContext operatorContext; + public DrillParquetReader(ParquetMetadata footer, RowGroupReadEntry entry, List<SchemaPath> columns, Configuration conf) { this.footer = footer; @@ -201,4 +204,10 @@ public class DrillParquetReader implements RecordReader { @Override public void cleanup() { } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java index 1ebd1f5..9a5920d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java @@ -23,6 +23,7 @@ import java.util.Iterator; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.pojo.Writers.BitWriter; @@ -47,12 +48,21 @@ public class PojoRecordReader<T> implements RecordReader{ private PojoWriter[] writers; private boolean doCurrent; private T currentPojo; + private OperatorContext operatorContext; public PojoRecordReader(Class<T> pojoClass, Iterator<T> iterator){ this.pojoClass = pojoClass; this.iterator = iterator; } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + @Override public void setup(OutputMutator output) throws ExecutionSetupException { try{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 28babd1..68921a2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -31,6 +31,7 @@ import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; import org.apache.drill.exec.ops.FragmentContext; +import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.store.RecordReader; @@ -57,7 +58,8 @@ public class DrillTextRecordReader implements RecordReader { private byte delimiter; private int targetRecordCount; private FieldReference ref = new FieldReference(COL_NAME); - private FragmentContext context; + private FragmentContext fragmentContext; + private OperatorContext operatorContext; private RepeatedVarCharVector vector; private List<Integer> columnIds = Lists.newArrayList(); private LongWritable key; @@ -67,7 +69,7 @@ public class DrillTextRecordReader implements RecordReader { private boolean first = true; public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) { - this.context = context; + this.fragmentContext = context; this.delimiter = (byte) delimiter; boolean getEntireRow = false; @@ -107,6 +109,14 @@ public class DrillTextRecordReader implements RecordReader { } } + public OperatorContext getOperatorContext() { + return operatorContext; + } + + public void setOperatorContext(OperatorContext operatorContext) { + this.operatorContext = operatorContext; + } + @Override public void setup(OutputMutator output) throws ExecutionSetupException { MaterializedField field = MaterializedField.create(ref, Types.repeated(TypeProtos.MinorType.VARCHAR)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java index cb36102..7868a53 100644 --- a/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java +++ b/exec/java-exec/src/main/java/parquet/hadoop/CodecFactoryExposer.java @@ -18,9 +18,20 @@ package parquet.hadoop; import java.io.IOException; +import java.io.OutputStream; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.WritableByteChannel; +import java.util.HashMap; +import java.util.Map; +import io.netty.buffer.ByteBuf; +import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.DirectDecompressionCodec; +import org.apache.hadoop.util.ReflectionUtils; import parquet.bytes.BytesInput; import parquet.hadoop.CodecFactory.BytesDecompressor; import parquet.hadoop.metadata.CompressionCodecName; @@ -28,9 +39,11 @@ import parquet.hadoop.metadata.CompressionCodecName; public class CodecFactoryExposer{ private CodecFactory codecFactory; + private final Map<String, org.apache.hadoop.io.compress.DirectDecompressionCodec> codecByName = new HashMap<String, org.apache.hadoop.io.compress.DirectDecompressionCodec>(); + private Configuration configuration; public CodecFactoryExposer(Configuration config){ - codecFactory = new CodecFactory(config); + codecFactory = new CodecFactory(config);configuration = config; } public CodecFactory getCodecFactory() { @@ -41,7 +54,84 @@ public class CodecFactoryExposer{ return codecFactory.getDecompressor(codecName).decompress(bytes, uncompressedSize); } - public BytesDecompressor getDecompressor(CompressionCodecName codec) { - return codecFactory.getDecompressor(codec); + public BytesInput getBytesInput(ByteBuf uncompressedByteBuf, int uncompressedSize) throws IOException { + ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize); + return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit()); + } + + public BytesInput decompress(CompressionCodecName codecName, + ByteBuf compressedByteBuf, + ByteBuf uncompressedByteBuf, + int compressedSize, + int uncompressedSize) throws IOException { + ByteBuffer inpBuffer=compressedByteBuf.nioBuffer(0, compressedSize); + ByteBuffer outBuffer=uncompressedByteBuf.nioBuffer(0, uncompressedSize); + CompressionCodec c = getCodec(codecName); + + Class<?> cx = c.getClass(); + ClassLoader l = cx.getClassLoader(); + Class<?>[] inf = cx.getInterfaces(); + + DirectDecompressionCodec d = (DirectDecompressionCodec)c; + + if(d!=null) { + d.createDirectDecompressor().decompress(inpBuffer, outBuffer); + }else{ + throw new DrillRuntimeException("Cannot create a decompression codec for codec "+codecName.name()); + } + return new HadoopByteBufBytesInput(outBuffer, 0, outBuffer.limit()); + } + + private DirectDecompressionCodec getCodec(CompressionCodecName codecName) { + String codecClassName = codecName.getHadoopCompressionCodecClassName(); + if (codecClassName == null) { + return null; + } + DirectDecompressionCodec codec = codecByName.get(codecClassName); + if (codec != null) { + return codec; + } + + try { + Class<?> codecClass = Class.forName(codecClassName); + codec = (DirectDecompressionCodec)ReflectionUtils.newInstance(codecClass, configuration); + codecByName.put(codecClassName, codec); + return codec; + } catch (ClassNotFoundException e) { + throw new BadConfigurationException("Class " + codecClassName + " was not found", e); + } + } + + public class HadoopByteBufBytesInput extends BytesInput{ + + private final ByteBuffer byteBuf; + private final int length; + private final int offset; + + private HadoopByteBufBytesInput(ByteBuffer byteBuf, int offset, int length) { + super(); + this.byteBuf = byteBuf; + this.offset = offset; + this.length = length; + } + + public void writeAllTo(OutputStream out) throws IOException { + final WritableByteChannel outputChannel = Channels.newChannel(out); + byteBuf.position(offset); + ByteBuffer tempBuf = byteBuf.slice(); + tempBuf.limit(length); + outputChannel.write(tempBuf); + } + + public ByteBuffer toByteBuffer() throws IOException { + byteBuf.position(offset); + ByteBuffer buf = byteBuf.slice(); + buf.limit(length); + return buf; + } + + public long size() { + return length; + } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java index 89beeb0..d399fa9 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/physical/impl/writer/TestParquetWriter.java @@ -27,6 +27,7 @@ import org.apache.drill.BaseTestQuery; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.memory.TopLevelAllocator; +import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.RecordBatchLoader; import org.apache.drill.exec.record.VectorWrapper; @@ -219,7 +220,8 @@ public class TestParquetWriter extends BaseTestQuery { fs.delete(path, true); } - test("use dfs_test.tmp"); + test("use dfs.tmp"); +// test("ALTER SESSION SET `planner.add_producer_consumer` = false"); String query = String.format("SELECT %s FROM %s", selection, inputTable); String create = "CREATE TABLE " + outputFile + " AS " + query; String validateQuery = String.format("SELECT %s FROM " + outputFile, validationSelection); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/02e4824e/exec/jdbc/pom.xml ---------------------------------------------------------------------- diff --git a/exec/jdbc/pom.xml b/exec/jdbc/pom.xml index b739db8..b9b3feb 100644 --- a/exec/jdbc/pom.xml +++ b/exec/jdbc/pom.xml @@ -127,7 +127,12 @@ <dependencies> <dependency> <groupId>org.apache.hadoop</groupId> - <artifactId>hadoop-core</artifactId> + <artifactId>hadoop-client</artifactId> + <scope>test</scope> + </dependency> + <dependency> + <groupId>org.apache.hadoop</groupId> + <artifactId>hadoop-common</artifactId> <scope>test</scope> </dependency> </dependencies> @@ -138,6 +143,13 @@ <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-core</artifactId> + <version>2.4.1-mapr-4.0.0-SNAPSHOT</version> + <exclusions> + <exclusion> + <artifactId>commons-logging</artifactId> + <groupId>commons-logging</groupId> + </exclusion> + </exclusions> <scope>test</scope> </dependency> </dependencies>
