http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java new file mode 100644 index 0000000..020c5d8 --- /dev/null +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/IndexPointer.java @@ -0,0 +1,24 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.drill.exec.physical.impl.common; + +public class IndexPointer { + static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(IndexPointer.class); + + public int value; +}
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java index 7233f69..15044b8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/join/HashJoinBatch.java @@ -18,7 +18,6 @@ package org.apache.drill.exec.physical.impl.join; import java.io.IOException; -import java.util.ArrayList; import java.util.List; import org.apache.drill.common.expression.FieldReference; @@ -35,7 +34,6 @@ import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.ClassGenerator; import org.apache.drill.exec.expr.CodeGenerator; import org.apache.drill.exec.expr.TypeHelper; -import org.apache.drill.exec.expr.holders.IntHolder; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.ops.FragmentContext; import org.apache.drill.exec.ops.MetricDef; @@ -44,8 +42,8 @@ import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; +import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.physical.impl.sort.RecordBatchData; -import org.apache.drill.exec.physical.impl.svremover.RemovingRecordBatch; import org.apache.drill.exec.record.AbstractRecordBatch; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.ExpandableHyperContainer; @@ -54,7 +52,6 @@ import org.apache.drill.exec.record.RecordBatch; import org.apache.drill.exec.record.TypedFieldId; import org.apache.drill.exec.record.VectorWrapper; import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.allocator.VectorAllocator; import org.eigenbase.rel.JoinRelType; import com.sun.codemodel.JExpr; @@ -155,7 +152,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { NUM_ENTRIES, NUM_RESIZING, RESIZING_TIME; - + // duplicate for hash ag @Override @@ -163,7 +160,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { return ordinal(); } } - + @Override public int getRecordCount() { return outputRecords; @@ -339,7 +336,7 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { hjHelper.addNewBatch(currentRecordCount); // Holder contains the global index where the key is hashed into using the hash table - IntHolder htIndex = new IntHolder(); + IndexPointer htIndex = new IndexPointer(); // For every record in the build batch , hash the key columns for (int i = 0; i < currentRecordCount; i++) { @@ -491,7 +488,9 @@ public class HashJoinBatch extends AbstractRecordBatch<HashJoinPOP> { @Override public void cleanup() { - hjHelper.clear(); + if(hjHelper != null){ + hjHelper.clear(); + } // If we didn't receive any data, hyperContainer may be null, check before clearing if (hyperContainer != null) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java index c5d342f..994eea4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/planner/sql/handlers/DefaultSqlHandler.java @@ -219,6 +219,7 @@ public class DefaultSqlHandler extends AbstractSqlHandler { phyRelNode = ProducerConsumerPrelVisitor.addProducerConsumerToScans(phyRelNode, (int) queueSize); } + /* 6.) * if the client does not support complex types (Map, Repeated) * insert a project which which would convert http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java index 61c1689..ac26969 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/DeadBuf.java @@ -20,6 +20,7 @@ package org.apache.drill.exec.record; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.ByteBufProcessor; +import io.netty.buffer.DrillBuf; import java.io.IOException; import java.io.InputStream; @@ -32,14 +33,14 @@ import java.nio.charset.Charset; public class DeadBuf extends ByteBuf { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(DeadBuf.class); - + private static final String ERROR_MESSAGE = "Attemped to access a DeadBuf. This would happen if you attempted to interact with a buffer that has been moved or not yet initialized."; - - public static final DeadBuf DEAD_BUFFER = new DeadBuf(); + + public static final DrillBuf DEAD_BUFFER = null; private DeadBuf(){} - + @Override public boolean isReadable(int size) { throw new UnsupportedOperationException(ERROR_MESSAGE); @@ -654,7 +655,7 @@ public class DeadBuf extends ByteBuf { } - + @Override public int bytesBefore(byte value) { throw new UnsupportedOperationException(ERROR_MESSAGE); @@ -667,14 +668,14 @@ public class DeadBuf extends ByteBuf { } - + @Override public int bytesBefore(int index, int length, byte value) { throw new UnsupportedOperationException(ERROR_MESSAGE); } - + @Override public ByteBuf copy() { throw new UnsupportedOperationException(ERROR_MESSAGE); @@ -839,6 +840,6 @@ public class DeadBuf extends ByteBuf { public String toString() { return null; } - - + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java index 0cca9cd..dbf17c4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RawFragmentBatch.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.record; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.rpc.RemoteConnection; @@ -29,10 +30,10 @@ public class RawFragmentBatch { final RemoteConnection connection; final FragmentRecordBatch header; - final ByteBuf body; + final DrillBuf body; final ResponseSender sender; - public RawFragmentBatch(RemoteConnection connection, FragmentRecordBatch header, ByteBuf body, ResponseSender sender) { + public RawFragmentBatch(RemoteConnection connection, FragmentRecordBatch header, DrillBuf body, ResponseSender sender) { super(); this.header = header; this.body = body; @@ -45,7 +46,7 @@ public class RawFragmentBatch { return header; } - public ByteBuf getBody() { + public DrillBuf getBody() { return body; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java index 0caed7d..d756685 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/RecordBatchLoader.java @@ -17,13 +17,12 @@ */ package org.apache.drill.exec.record; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import java.util.Iterator; import java.util.List; import java.util.Map; -import io.netty.buffer.EmptyByteBuf; import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.exec.exception.SchemaChangeException; import org.apache.drill.exec.expr.TypeHelper; @@ -57,7 +56,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp * @return Whether or not the schema changed since the previous load. * @throws SchemaChangeException */ - public boolean load(RecordBatchDef def, ByteBuf buf) throws SchemaChangeException { + public boolean load(RecordBatchDef def, DrillBuf buf) throws SchemaChangeException { // logger.debug("Loading record batch with def {} and data {}", def, buf); container.zeroVectors(); this.valueCount = def.getRecordCount(); @@ -85,7 +84,7 @@ public class RecordBatchLoader implements VectorAccessible, Iterable<VectorWrapp } if (fmd.getValueCount() == 0 && (!fmd.hasGroupCount() || fmd.getGroupCount() == 0)) { // v.clear(); - v.load(fmd, new EmptyByteBuf(allocator.getUnderlyingAllocator())); + v.load(fmd, allocator.buffer(0)); } else { v.load(fmd, buf.slice(bufOffset, fmd.getBufferLength())); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java index 22aa731..3154e6e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/TypedFieldId.java @@ -231,5 +231,14 @@ public class TypedFieldId { return true; } + @Override + public String toString() { + final int maxLen = 10; + return "TypedFieldId [fieldIds=" + + (fieldIds != null ? Arrays.toString(Arrays.copyOf(fieldIds, Math.min(fieldIds.length, maxLen))) : null) + + ", remainder=" + remainder + "]"; + } + + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java index b55ed82..3810115 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/WritableBatch.java @@ -17,8 +17,7 @@ */ package org.apache.drill.exec.record; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.CompositeByteBuf; +import io.netty.buffer.DrillBuf; import java.util.List; @@ -37,15 +36,15 @@ public class WritableBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(WritableBatch.class); private final RecordBatchDef def; - private final ByteBuf[] buffers; + private final DrillBuf[] buffers; private boolean cleared = false; - private WritableBatch(RecordBatchDef def, List<ByteBuf> buffers) { + private WritableBatch(RecordBatchDef def, List<DrillBuf> buffers) { this.def = def; - this.buffers = buffers.toArray(new ByteBuf[buffers.size()]); + this.buffers = buffers.toArray(new DrillBuf[buffers.size()]); } - private WritableBatch(RecordBatchDef def, ByteBuf[] buffers) { + private WritableBatch(RecordBatchDef def, DrillBuf[] buffers) { super(); this.def = def; this.buffers = buffers; @@ -55,20 +54,27 @@ public class WritableBatch { return def; } - public ByteBuf[] getBuffers() { + public DrillBuf[] getBuffers() { return buffers; } public void reconstructContainer(VectorContainer container) { Preconditions.checkState(!cleared, "Attempted to reconstruct a container from a WritableBatch after it had been cleared"); - if (buffers.length > 0) { /* If we have ByteBuf's associated with value vectors */ + if (buffers.length > 0) { /* If we have DrillBuf's associated with value vectors */ + int len = 0; + for(DrillBuf b : buffers){ + len += b.capacity(); + } - CompositeByteBuf cbb = new CompositeByteBuf(buffers[0].alloc(), true, buffers.length); + DrillBuf newBuf = buffers[0].getAllocator().buffer(len); /* Copy data from each buffer into the compound buffer */ - for (ByteBuf buf : buffers) { - cbb.addComponent(buf); + int offset = 0; + for (DrillBuf buf : buffers) { + newBuf.setBytes(offset, buf); + offset += buf.capacity(); + buf.release(); } List<SerializedField> fields = def.getFieldList(); @@ -83,7 +89,7 @@ public class WritableBatch { for (VectorWrapper<?> vv : container) { SerializedField fmd = fields.get(vectorIndex); ValueVector v = vv.getValueVector(); - ByteBuf bb = cbb.slice(bufferOffset, fmd.getBufferLength()); + DrillBuf bb = newBuf.slice(bufferOffset, fmd.getBufferLength()); // v.load(fmd, cbb.slice(bufferOffset, fmd.getBufferLength())); v.load(fmd, bb); bb.release(); @@ -109,7 +115,7 @@ public class WritableBatch { public void clear() { if(cleared) return; - for (ByteBuf buf : buffers) { + for (DrillBuf buf : buffers) { buf.release(); } cleared = true; @@ -125,7 +131,7 @@ public class WritableBatch { } public static WritableBatch getBatchNoHV(int recordCount, Iterable<ValueVector> vectors, boolean isSV2) { - List<ByteBuf> buffers = Lists.newArrayList(); + List<DrillBuf> buffers = Lists.newArrayList(); List<SerializedField> metadata = Lists.newArrayList(); for (ValueVector vv : vectors) { @@ -137,7 +143,7 @@ public class WritableBatch { continue; } - for (ByteBuf b : vv.getBuffers()) { + for (DrillBuf b : vv.getBuffers()) { buffers.add(b); } // remove vv access to buffers. @@ -159,13 +165,13 @@ public class WritableBatch { } public void retainBuffers() { - for (ByteBuf buf : buffers) { + for (DrillBuf buf : buffers) { buf.retain(); } } - + public void retainBuffers(int increment) { - for (ByteBuf buf : buffers) { + for (DrillBuf buf : buffers) { buf.retain(increment); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java index af1e2b6..038bb2f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/selection/SelectionVector2.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.record.selection; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import java.io.Closeable; import java.io.IOException; @@ -33,7 +33,7 @@ public class SelectionVector2 implements Closeable{ private final BufferAllocator allocator; private int recordCount; - private ByteBuf buffer = DeadBuf.DEAD_BUFFER; + private DrillBuf buffer = DeadBuf.DEAD_BUFFER; public static final int RECORD_SIZE = 2; @@ -45,9 +45,9 @@ public class SelectionVector2 implements Closeable{ return recordCount; } - public ByteBuf getBuffer() + public DrillBuf getBuffer() { - ByteBuf bufferHandle = this.buffer; + DrillBuf bufferHandle = this.buffer; /* Increment the ref count for this buffer */ bufferHandle.retain(); @@ -60,7 +60,7 @@ public class SelectionVector2 implements Closeable{ return bufferHandle; } - public void setBuffer(ByteBuf bufferHandle) + public void setBuffer(DrillBuf bufferHandle) { /* clear the existing buffer */ clear(); @@ -70,7 +70,6 @@ public class SelectionVector2 implements Closeable{ } public char getIndex(int index){ - return buffer.getChar(index * RECORD_SIZE); } @@ -78,6 +77,14 @@ public class SelectionVector2 implements Closeable{ buffer.setChar(index * RECORD_SIZE, value); } + public long getDataAddr(){ + return buffer.memoryAddress(); + } + + public void setIndex(int index, int value){ + buffer.setChar(index, value); + } + public boolean allocateNew(int size){ clear(); buffer = allocator.buffer(size * RECORD_SIZE); @@ -100,7 +107,7 @@ public class SelectionVector2 implements Closeable{ clear(); return newSV; } - + public void clear() { if (buffer != null && buffer != DeadBuf.DEAD_BUFFER) { buffer.release(); @@ -108,7 +115,7 @@ public class SelectionVector2 implements Closeable{ recordCount = 0; } } - + public void setRecordCount(int recordCount){ // logger.debug("Seting record count to {}", recordCount); this.recordCount = recordCount; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java index 8176567..93f5b6d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/resolver/TypeCastRules.java @@ -30,9 +30,9 @@ import org.apache.drill.common.types.Types; import org.apache.drill.common.types.TypeProtos.DataMode; import org.apache.drill.common.types.TypeProtos.MajorType; import org.apache.drill.common.types.TypeProtos.MinorType; -import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; import org.apache.drill.exec.expr.fn.DrillFuncHolder; +import org.apache.drill.exec.util.DecimalUtility; public class TypeCastRules { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java index 473e3e6..02fb75e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/ProtobufLengthDecoder.java @@ -18,8 +18,6 @@ package org.apache.drill.exec.rpc; import io.netty.buffer.ByteBuf; -import io.netty.buffer.SwappedByteBuf; -import io.netty.buffer.Unpooled; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.ByteToMessageDecoder; import io.netty.handler.codec.CorruptedFrameException; @@ -29,7 +27,6 @@ import java.util.List; import org.apache.drill.exec.memory.BufferAllocator; import com.google.protobuf.CodedInputStream; -import org.apache.drill.exec.proto.GeneralRPCProtos.RpcMode; /** * Modified version of ProtobufVarint32FrameDecoder that avoids bytebuf copy. @@ -39,7 +36,7 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { private BufferAllocator allocator; private OutOfMemoryHandler outOfMemoryHandler; - + public ProtobufLengthDecoder(BufferAllocator allocator, OutOfMemoryHandler outOfMemoryHandler) { super(); this.allocator = allocator; @@ -90,14 +87,14 @@ public class ProtobufLengthDecoder extends ByteToMessageDecoder { return; } outBuf.writeBytes(in, in.readerIndex(), length); - + in.skipBytes(length); - + if (RpcConstants.EXTRA_DEBUGGING) logger.debug(String.format( "ReaderIndex is %d after length header of %d bytes and frame body of length %d bytes.", in.readerIndex(), i + 1, length)); - + out.add(outBuf); return; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java index f1c94d3..d4a73c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandler.java @@ -17,11 +17,10 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; import org.apache.drill.exec.rpc.RemoteConnection; -import org.apache.drill.exec.rpc.Response; import org.apache.drill.exec.rpc.ResponseSender; import org.apache.drill.exec.rpc.RpcException; import org.apache.drill.exec.work.fragment.FragmentManager; @@ -29,7 +28,7 @@ import org.apache.drill.exec.work.fragment.FragmentManager; public interface DataResponseHandler { public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch fragmentBatch, - ByteBuf data, ResponseSender responder) throws RpcException; + DrillBuf data, ResponseSender responder) throws RpcException; public void informOutOfMemory(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java index 5d71c90..c37550f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataResponseHandlerImpl.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.exception.FragmentSetupException; import org.apache.drill.exec.proto.BitData.FragmentRecordBatch; @@ -48,7 +48,7 @@ public class DataResponseHandlerImpl implements DataResponseHandler{ } - public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch fragmentBatch, ByteBuf data, ResponseSender sender) throws RpcException { + public void handle(RemoteConnection connection, FragmentManager manager, FragmentRecordBatch fragmentBatch, DrillBuf data, ResponseSender sender) throws RpcException { // logger.debug("Fragment Batch received {}", fragmentBatch); try { boolean canRun = manager.handle(new RawFragmentBatch(connection, fragmentBatch, data, sender)); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java index 42dee94..8611b5a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/data/DataServer.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.rpc.data; -import io.netty.buffer.AccountingByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.ByteBuf; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; @@ -114,11 +114,11 @@ public class DataServer extends BasicServer<RpcType, BitServerConnection> { } BufferAllocator allocator = manager.getFragmentContext().getAllocator(); if(body != null){ - if(!allocator.takeOwnership((AccountingByteBuf) body.unwrap())){ + if(!allocator.takeOwnership((DrillBuf) body.unwrap())){ dataHandler.handle(connection, manager, OOM_FRAGMENT, null, null); } } - dataHandler.handle(connection, manager, fragmentBatch, body, sender); + dataHandler.handle(connection, manager, fragmentBatch, (DrillBuf) body, sender); } catch (FragmentSetupException e) { logger.error("Failure while getting fragment manager. {}", QueryIdHelper.getQueryIdentifier(handle), e); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java index 98e5943..e36a1c6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultBatch.java @@ -17,32 +17,33 @@ */ package org.apache.drill.exec.rpc.user; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.proto.UserBitShared.QueryResult; public class QueryResultBatch { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(QueryResultBatch.class); - + private final QueryResult header; - private final ByteBuf data; - - public QueryResultBatch(QueryResult header, ByteBuf data) { + private final DrillBuf data; + + public QueryResultBatch(QueryResult header, DrillBuf data) { // logger.debug("New Result Batch with header {} and data {}", header, data); this.header = header; this.data = data; if(this.data != null) data.retain(); } + public QueryResult getHeader() { return header; } - public ByteBuf getData() { + public DrillBuf getData() { return data; } - - + + public boolean hasData(){ return data != null; } @@ -50,12 +51,12 @@ public class QueryResultBatch { public void release(){ if(data != null) data.release(); } - + @Override public String toString() { return "QueryResultBatch [header=" + header + ", data=" + data + "]"; } - - - + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java index 95b7446..4bbb13e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/rpc/user/QueryResultHandler.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.rpc.user; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.ConcurrentMap; @@ -55,7 +56,7 @@ public class QueryResultHandler { public void batchArrived(ConnectionThrottle throttle, ByteBuf pBody, ByteBuf dBody) throws RpcException { final QueryResult result = RpcBus.get(pBody, QueryResult.PARSER); - final QueryResultBatch batch = new QueryResultBatch(result, dBody); + final QueryResultBatch batch = new QueryResultBatch(result, (DrillBuf) dBody); final boolean failed = (batch.getHeader().getQueryState() == QueryState.FAILED); assert failed || batch.getHeader().getErrorCount() == 0 : "Error count for the query batch is non-zero but QueryState != FAILED"; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java index eed0126..7d48711 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/DrillbitContext.java @@ -23,6 +23,7 @@ import java.util.Collection; import org.apache.drill.common.config.DrillConfig; import org.apache.drill.exec.cache.DistributedCache; +import org.apache.drill.exec.compile.CodeCompiler; import org.apache.drill.exec.coord.ClusterCoordinator; import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry; import org.apache.drill.exec.memory.BufferAllocator; @@ -57,6 +58,7 @@ public class DrillbitContext { private final FunctionImplementationRegistry functionRegistry; private final SystemOptionManager systemOptions; private final PStoreProvider provider; + private final CodeCompiler compiler; public DrillbitContext(DrillbitEndpoint endpoint, BootStrapContext context, ClusterCoordinator coord, Controller controller, DataConnectionCreator connectionsPool, DistributedCache cache, WorkEventBus workBus, PStoreProvider provider) { super(); @@ -77,8 +79,7 @@ public class DrillbitContext { this.operatorCreatorRegistry = new OperatorCreatorRegistry(context.getConfig()); this.functionRegistry = new FunctionImplementationRegistry(context.getConfig()); this.systemOptions = new SystemOptionManager(context.getConfig(), provider); - -// this.globalDrillOptions = new DistributedGlobalOptions(this.cache); + this.compiler = new CodeCompiler(context.getConfig(), cache, systemOptions); } public FunctionImplementationRegistry getFunctionImplementationRegistry() { @@ -153,4 +154,10 @@ public class DrillbitContext { return coord; } + public CodeCompiler getCompiler() { + return compiler; + } + + + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java index eddb818..b322086 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/server/options/SystemOptionManager.java @@ -99,9 +99,10 @@ public class SystemOptionManager implements OptionManager{ .build(); } - public void init() throws IOException{ + public SystemOptionManager init() throws IOException{ this.options = provider.getPStore(config); this.admin = new SystemOptionAdmin(); + return this; } private class Iter implements Iterator<OptionValue>{ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java index cd3cfdc..5d741e0 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/easy/json/JSONRecordReader2.java @@ -59,7 +59,7 @@ public class JSONRecordReader2 implements RecordReader{ List<SchemaPath> columns) throws OutOfMemoryException { this.hadoopPath = new Path(inputPath); this.fileSystem = fileSystem; - this.fragmentContext=fragmentContext; + this.fragmentContext = fragmentContext; } @Override @@ -69,9 +69,9 @@ public class JSONRecordReader2 implements RecordReader{ JsonRecordSplitter splitter = new UTF8JsonRecordSplitter(stream); this.writer = new VectorContainerWriter(output); this.mutator = output; - jsonReader = new JsonReaderWithState(splitter); + jsonReader = new JsonReaderWithState(splitter, fragmentContext.getManagedBuffer()); }catch(Exception e){ - handleAndRaise("Failure reading JSON file.", e); + throw new ExecutionSetupException("Failure reading JSON file.", e); } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java index 5c25e11..54771e4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/FixedByteAlignedReader.java @@ -18,16 +18,19 @@ package org.apache.drill.exec.store.parquet.columnreaders; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; import org.apache.drill.exec.store.ParquetOutputRecordWriter; +import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.DateVector; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.ValueVector; import org.joda.time.DateTimeUtils; + import parquet.column.ColumnDescriptor; import parquet.format.SchemaElement; import parquet.hadoop.metadata.ColumnChunkMetaData; @@ -36,9 +39,9 @@ import java.math.BigDecimal; class FixedByteAlignedReader extends ColumnReader { - protected ByteBuf bytebuf; + protected DrillBuf bytebuf; + - FixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); @@ -120,7 +123,7 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal28SparseHolder.nDecimalDigits); } @@ -139,7 +142,7 @@ class FixedByteAlignedReader extends ColumnReader { @Override void addNext(int start, int index) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), Decimal38SparseHolder.nDecimalDigits); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java index 4d7f312..f88d56a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableFixedByteAlignedReaders.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.store.parquet.columnreaders; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.util.DecimalUtility; +import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableFloat4Vector; import org.apache.drill.exec.vector.NullableFloat8Vector; @@ -31,8 +33,8 @@ import org.apache.drill.exec.store.ParquetOutputRecordWriter; import org.apache.drill.exec.vector.NullableDateVector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; import org.apache.drill.exec.vector.NullableDecimal38SparseVector; - import org.joda.time.DateTimeUtils; + import parquet.column.ColumnDescriptor; import parquet.format.SchemaElement; import parquet.hadoop.metadata.ColumnChunkMetaData; @@ -42,7 +44,7 @@ import java.math.BigDecimal; public class NullableFixedByteAlignedReaders { static class NullableFixedByteAlignedReader extends NullableColumnReader { - protected ByteBuf bytebuf; + protected DrillBuf bytebuf; NullableFixedByteAlignedReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, ValueVector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -67,8 +69,6 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryIntReader extends NullableColumnReader<NullableIntVector> { - private ByteBuf bytebuf; - NullableDictionaryIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableIntVector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -88,8 +88,6 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryBigIntReader extends NullableColumnReader<NullableBigIntVector> { - private ByteBuf bytebuf; - NullableDictionaryBigIntReader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableBigIntVector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -107,8 +105,6 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryFloat4Reader extends NullableColumnReader<NullableFloat4Vector> { - private ByteBuf bytebuf; - NullableDictionaryFloat4Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat4Vector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -126,8 +122,6 @@ public class NullableFixedByteAlignedReaders { static class NullableDictionaryFloat8Reader extends NullableColumnReader<NullableFloat8Vector> { - private ByteBuf bytebuf; - NullableDictionaryFloat8Reader(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableFloat8Vector v, SchemaElement schemaElement) throws ExecutionSetupException { @@ -211,7 +205,7 @@ public class NullableFixedByteAlignedReaders { @Override void addNext(int start, int index) { int width = NullableDecimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal28Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal28SparseHolder.nDecimalDigits); } @@ -230,7 +224,7 @@ public class NullableFixedByteAlignedReaders { @Override void addNext(int start, int index) { int width = NullableDecimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, dataTypeLengthInBytes, schemaElement.getScale()); DecimalUtility.getSparseFromBigDecimal(intermediate, decimal38Vector.getData(), index * width, schemaElement.getScale(), schemaElement.getPrecision(), NullableDecimal38SparseHolder.nDecimalDigits); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java index 83812ea..ba9ff80 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/NullableVarLengthValuesColumn.java @@ -18,8 +18,11 @@ package org.apache.drill.exec.store.parquet.columnreaders; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; + import parquet.bytes.BytesUtils; import parquet.column.ColumnDescriptor; import parquet.format.SchemaElement; @@ -38,7 +41,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); } - public abstract boolean setSafe(int index, ByteBuf value, int start, int length); + public abstract boolean setSafe(int index, DrillBuf value, int start, int length); public abstract int capacity(); @@ -82,8 +85,7 @@ public abstract class NullableVarLengthValuesColumn<V extends ValueVector> exten } else { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division - dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(), - (int) pageReader.readyToReadPosInBytes); + dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes); } // I think this also needs to happen if it is null for the random access if (! variableWidthVector.getMutator().setValueLengthSafe((int) valuesReadInCurrentPass + pageReader.valuesReadyToRead, dataTypeLengthInBits)) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java index 1687c3b..4165cbd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/PageReader.java @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.List; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.Unpooled; + import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.store.parquet.ColumnDataReader; @@ -54,7 +56,7 @@ final class PageReader { // store references to the pages that have been uncompressed, but not copied to ValueVectors yet Page currentPage; // buffer to store bytes of current page - ByteBuf pageDataByteArray; + DrillBuf pageDataByteArray; // for variable length data we need to keep track of our current position in the page data // as the values and lengths are intermixed, making random access to the length data impossible @@ -134,10 +136,10 @@ final class PageReader { throw new ExecutionSetupException("Error opening or reading metadata for parquet file at location: " + path.getName(), e); } - + } - + /** * Grab the next page. * @@ -226,7 +228,7 @@ final class PageReader { return false; } - pageDataByteArray = Unpooled.wrappedBuffer(currentPage.getBytes().toByteBuffer()); + pageDataByteArray = DrillBuf.wrapByteBuffer(currentPage.getBytes().toByteBuffer()); readPosInBytes = 0; if (parentColumnReader.getColumnDescriptor().getMaxRepetitionLevel() > 0) { @@ -272,7 +274,7 @@ final class PageReader { readyToReadPosInBytes = readPosInBytes; return true; } - + public void clear(){ this.dataReader.clear(); // Free all memory, including fixed length types. (Data is being copied for all types not just var length types) http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java index 555cb94..ecfa110 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthColumnReaders.java @@ -17,18 +17,20 @@ ******************************************************************************/ package org.apache.drill.exec.store.parquet.columnreaders; -import java.math.BigDecimal; - import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.Unpooled; + +import java.math.BigDecimal; + import org.apache.drill.common.exceptions.ExecutionSetupException; -import org.apache.drill.common.util.DecimalUtility; -import org.apache.drill.exec.expr.holders.VarCharHolder; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; -import org.apache.drill.exec.expr.holders.VarBinaryHolder; -import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; +import org.apache.drill.exec.expr.holders.NullableVarBinaryHolder; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; +import org.apache.drill.exec.expr.holders.VarBinaryHolder; +import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.Decimal28SparseVector; import org.apache.drill.exec.vector.Decimal38SparseVector; import org.apache.drill.exec.vector.NullableDecimal28SparseVector; @@ -37,6 +39,7 @@ import org.apache.drill.exec.vector.NullableVarBinaryVector; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.VarBinaryVector; import org.apache.drill.exec.vector.VarCharVector; + import parquet.column.ColumnDescriptor; import parquet.format.SchemaElement; import parquet.hadoop.metadata.ColumnChunkMetaData; @@ -56,9 +59,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { + public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= decimal28Vector.getValueCapacity()) { return false; } @@ -85,9 +88,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { + public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) { int width = Decimal28SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= nullableDecimal28Vector.getValueCapacity()) { return false; } @@ -115,9 +118,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { + public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= decimal28Vector.getValueCapacity()) { return false; } @@ -144,9 +147,9 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { + public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) { int width = Decimal38SparseHolder.WIDTH; - BigDecimal intermediate = DecimalUtility.getBigDecimalFromByteBuf(bytebuf, start, length, schemaElement.getScale()); + BigDecimal intermediate = DecimalUtility.getBigDecimalFromDrillBuf(bytebuf, start, length, schemaElement.getScale()); if (index >= nullableDecimal38Vector.getValueCapacity()) { return false; } @@ -176,12 +179,12 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf bytebuf, int start, int length) { + public boolean setSafe(int index, DrillBuf bytebuf, int start, int length) { boolean success; if(index >= varCharVector.getValueCapacity()) return false; if (usingDictionary) { - ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); int st=0; int len=currDictVal.length(); VarCharHolder holder = new VarCharHolder(); @@ -211,44 +214,34 @@ public class VarLengthColumnReaders { int nullsRead; boolean currentValNull = false; // store a hard reference to the vector (which is also stored in the superclass) to prevent repetitive casting - protected NullableVarCharVector nullableVarCharVector; + protected final NullableVarCharVector.Mutator mutator; + private final NullableVarCharVector vector; NullableVarCharColumn(ParquetRecordReader parentReader, int allocateSize, ColumnDescriptor descriptor, ColumnChunkMetaData columnChunkMetaData, boolean fixedLength, NullableVarCharVector v, SchemaElement schemaElement) throws ExecutionSetupException { super(parentReader, allocateSize, descriptor, columnChunkMetaData, fixedLength, v, schemaElement); - nullableVarCharVector = v; + vector = v; + this.mutator = vector.getMutator(); } - public boolean setSafe(int index, ByteBuf value, int start, int length) { + public boolean setSafe(int index, DrillBuf value, int start, int length) { boolean success; - if(index >= nullableVarCharVector.getValueCapacity()) return false; + if(index >= vector.getValueCapacity()) return false; if (usingDictionary) { - ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); - int st=0; - int len=currDictVal.length(); - NullableVarCharHolder holder = new NullableVarCharHolder(); - holder.buffer=b; - holder.start=0; - holder.end=currDictVal.length(); - success = nullableVarCharVector.getMutator().setSafe(index, holder); - holder.isSet=1; + DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); + success = mutator.setSafe(index, 1, 0, currDictVal.length(), b); } else { - NullableVarCharHolder holder = new NullableVarCharHolder(); - holder.buffer=value; - holder.start=start; - holder.end=start+length; - holder.isSet=1; - success = nullableVarCharVector.getMutator().setSafe(index, holder); + success = mutator.setSafe(index, 1, start, start+length, value); } return success; } @Override public int capacity() { - return nullableVarCharVector.getData().capacity(); + return vector.getData().capacity(); } } @@ -265,12 +258,12 @@ public class VarLengthColumnReaders { } @Override - public boolean setSafe(int index, ByteBuf value, int start, int length) { + public boolean setSafe(int index, DrillBuf value, int start, int length) { boolean success; if(index >= varBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); int st=0; int len=currDictVal.length(); VarBinaryHolder holder = new VarBinaryHolder(); @@ -309,14 +302,12 @@ public class VarLengthColumnReaders { nullableVarBinaryVector = v; } - public boolean setSafe(int index, ByteBuf value, int start, int length) { + public boolean setSafe(int index, DrillBuf value, int start, int length) { boolean success; if(index >= nullableVarBinaryVector.getValueCapacity()) return false; if (usingDictionary) { - ByteBuf b = Unpooled.wrappedBuffer(currDictVal.toByteBuffer()); - int st=0; - int len=currDictVal.length(); + DrillBuf b = DrillBuf.wrapByteBuffer(currDictVal.toByteBuffer()); NullableVarBinaryHolder holder = new NullableVarBinaryHolder(); holder.buffer=b; holder.start=0; http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java index 7f16c64..829b44a 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet/columnreaders/VarLengthValuesColumn.java @@ -18,9 +18,12 @@ package org.apache.drill.exec.store.parquet.columnreaders; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.VariableWidthVector; + import parquet.bytes.BytesUtils; import parquet.column.ColumnDescriptor; import parquet.format.Encoding; @@ -49,7 +52,7 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe } } - public abstract boolean setSafe(int index, ByteBuf bytes, int start, int length); + public abstract boolean setSafe(int index, DrillBuf bytes, int start, int length); @Override protected void readField(long recordToRead) { @@ -80,8 +83,7 @@ public abstract class VarLengthValuesColumn<V extends ValueVector> extends VarLe protected boolean readAndStoreValueSizeInformation() throws IOException { // re-purposing this field here for length in BYTES to prevent repetitive multiplication/division try { - dataTypeLengthInBits = BytesUtils.readIntLittleEndian(pageReader.pageDataByteArray.nioBuffer(), - (int) pageReader.readyToReadPosInBytes); + dataTypeLengthInBits = pageReader.pageDataByteArray.getInt((int) pageReader.readyToReadPosInBytes); } catch (Throwable t) { throw t; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java index f9bac65..c6310b1 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetGroupConverter.java @@ -17,12 +17,11 @@ */ package org.apache.drill.exec.store.parquet2; -import com.google.common.collect.Lists; +import io.netty.buffer.DrillBuf; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import java.math.BigDecimal; +import java.util.List; -import org.apache.drill.common.util.DecimalUtility; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; import org.apache.drill.exec.expr.holders.DateHolder; @@ -37,8 +36,9 @@ import org.apache.drill.exec.expr.holders.TimeHolder; import org.apache.drill.exec.expr.holders.TimeStampHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; +import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.store.ParquetOutputRecordWriter; -import org.apache.drill.exec.store.parquet.columnreaders.NullableFixedByteAlignedReaders; +import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; import org.apache.drill.exec.vector.complex.writer.BaseWriter.MapWriter; import org.apache.drill.exec.vector.complex.writer.BigIntWriter; @@ -68,30 +68,31 @@ import parquet.schema.PrimitiveType; import parquet.schema.Type; import parquet.schema.Type.Repetition; -import java.math.BigDecimal; -import java.util.List; +import com.google.common.collect.Lists; public class DrillParquetGroupConverter extends GroupConverter { private List<Converter> converters; private MapWriter mapWriter; + private final OutputMutator mutator; - public DrillParquetGroupConverter(ComplexWriterImpl complexWriter, MessageType schema) { - this(complexWriter.rootAsMap(), schema); + public DrillParquetGroupConverter(OutputMutator mutator, ComplexWriterImpl complexWriter, MessageType schema) { + this(mutator, complexWriter.rootAsMap(), schema); } - public DrillParquetGroupConverter(MapWriter mapWriter, GroupType schema) { + public DrillParquetGroupConverter(OutputMutator mutator, MapWriter mapWriter, GroupType schema) { this.mapWriter = mapWriter; + this.mutator = mutator; converters = Lists.newArrayList(); for (Type type : schema.getFields()) { Repetition rep = type.getRepetition(); boolean isPrimitive = type.isPrimitive(); if (!isPrimitive) { if (rep != Repetition.REPEATED) { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.map(type.getName()), type.asGroupType()); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.map(type.getName()), type.asGroupType()); converters.add(converter); } else { - DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mapWriter.list(type.getName()).map(), type.asGroupType()); + DrillParquetGroupConverter converter = new DrillParquetGroupConverter(mutator, mapWriter.list(type.getName()).map(), type.asGroupType()); converters.add(converter); } } else { @@ -162,21 +163,21 @@ public class DrillParquetGroupConverter extends GroupConverter { case BINARY: { if (type.getOriginalType() == null) { VarBinaryWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varBinary() : mapWriter.varBinary(name); - return new DrillVarBinaryConverter(writer); + return new DrillVarBinaryConverter(writer, mutator.getManagedBuffer()); } switch(type.getOriginalType()) { case UTF8: { VarCharWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).varChar() : mapWriter.varChar(name); - return new DrillVarCharConverter(writer); + return new DrillVarCharConverter(writer, mutator.getManagedBuffer()); } case DECIMAL: { DecimalMetadata metadata = type.getDecimalMetadata(); if (metadata.getPrecision() <= 28) { Decimal28SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal28Sparse() : mapWriter.decimal28Sparse(name); - return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale()); + return new DrillBinaryToDecimal28Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } else { Decimal38SparseWriter writer = type.getRepetition() == Repetition.REPEATED ? mapWriter.list(name).decimal38Sparse() : mapWriter.decimal38Sparse(name); - return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale()); + return new DrillBinaryToDecimal38Converter(writer, metadata.getPrecision(), metadata.getScale(), mutator.getManagedBuffer()); } } default: { @@ -363,16 +364,18 @@ public class DrillParquetGroupConverter extends GroupConverter { public static class DrillVarBinaryConverter extends PrimitiveConverter { private VarBinaryWriter writer; + private DrillBuf buf; private VarBinaryHolder holder = new VarBinaryHolder(); - public DrillVarBinaryConverter(VarBinaryWriter writer) { + public DrillVarBinaryConverter(VarBinaryWriter writer, DrillBuf buf) { this.writer = writer; + this.buf = buf; } @Override public void addBinary(Binary value) { - ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer()); - holder.buffer = buf; + holder.buffer = buf = buf.reallocIfNeeded(value.length()); + buf.setBytes(0, value.toByteBuffer()); holder.start = 0; holder.end = value.length(); writer.write(holder); @@ -382,15 +385,17 @@ public class DrillParquetGroupConverter extends GroupConverter { public static class DrillVarCharConverter extends PrimitiveConverter { private VarCharWriter writer; private VarCharHolder holder = new VarCharHolder(); + private DrillBuf buf; - public DrillVarCharConverter(VarCharWriter writer) { + public DrillVarCharConverter(VarCharWriter writer, DrillBuf buf) { this.writer = writer; + this.buf = buf; } @Override public void addBinary(Binary value) { - ByteBuf buf = Unpooled.wrappedBuffer(value.toByteBuffer()); - holder.buffer = buf; + holder.buffer = buf = buf.reallocIfNeeded(value.length()); + buf.setBytes(0, value.toByteBuffer()); holder.start = 0; holder.end = value.length(); writer.write(holder); @@ -400,9 +405,11 @@ public class DrillParquetGroupConverter extends GroupConverter { public static class DrillBinaryToDecimal28Converter extends PrimitiveConverter { private Decimal28SparseWriter writer; private Decimal28SparseHolder holder = new Decimal28SparseHolder(); + private DrillBuf buf; - public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int precision, int scale) { + public DrillBinaryToDecimal28Converter(Decimal28SparseWriter writer, int precision, int scale, DrillBuf buf) { this.writer = writer; + this.buf = buf.reallocIfNeeded(28); holder.precision = precision; holder.scale = scale; } @@ -410,7 +417,6 @@ public class DrillParquetGroupConverter extends GroupConverter { @Override public void addBinary(Binary value) { BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale); - ByteBuf buf = Unpooled.wrappedBuffer(new byte[28]); DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal28SparseHolder.nDecimalDigits); holder.buffer = buf; writer.write(holder); @@ -420,9 +426,11 @@ public class DrillParquetGroupConverter extends GroupConverter { public static class DrillBinaryToDecimal38Converter extends PrimitiveConverter { private Decimal38SparseWriter writer; private Decimal38SparseHolder holder = new Decimal38SparseHolder(); + private DrillBuf buf; - public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int precision, int scale) { + public DrillBinaryToDecimal38Converter(Decimal38SparseWriter writer, int precision, int scale, DrillBuf buf) { this.writer = writer; + this.buf = buf.reallocIfNeeded(38); holder.precision = precision; holder.scale = scale; } @@ -430,7 +438,6 @@ public class DrillParquetGroupConverter extends GroupConverter { @Override public void addBinary(Binary value) { BigDecimal bigDecimal = DecimalUtility.getBigDecimalFromByteArray(value.getBytes(), 0, value.length(), holder.scale); - ByteBuf buf = Unpooled.wrappedBuffer(new byte[38]); DecimalUtility.getSparseFromBigDecimal(bigDecimal, buf, 0, holder.scale, holder.precision, Decimal38SparseHolder.nDecimalDigits); holder.buffer = buf; writer.write(holder); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java index f47acab..16f520c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetReader.java @@ -156,7 +156,7 @@ public class DrillParquetReader implements RecordReader { } writer = new VectorContainerWriter(output); - recordMaterializer = new DrillParquetRecordMaterializer(writer, projection); + recordMaterializer = new DrillParquetRecordMaterializer(output, writer, projection); primitiveVectors = writer.getMapVector().getPrimitiveVectors(); recordReader = columnIO.getRecordReader(pageReadStore, recordMaterializer); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java index 69893dc..f9c3480 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/parquet2/DrillParquetRecordMaterializer.java @@ -17,24 +17,21 @@ */ package org.apache.drill.exec.store.parquet2; -import org.apache.drill.exec.vector.ValueVector; -import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; +import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; + import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.MessageType; -import java.util.List; - public class DrillParquetRecordMaterializer extends RecordMaterializer<Void> { public DrillParquetGroupConverter root; private ComplexWriter complexWriter; - public DrillParquetRecordMaterializer(ComplexWriter complexWriter, MessageType schema) { + public DrillParquetRecordMaterializer(OutputMutator mutator, ComplexWriter complexWriter, MessageType schema) { this.complexWriter = complexWriter; - root = new DrillParquetGroupConverter(complexWriter.rootAsMap(), schema); + root = new DrillParquetGroupConverter(mutator, complexWriter.rootAsMap(), schema); } public void setPosition(int position) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java index 9a5920d..48e09aa 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/PojoRecordReader.java @@ -18,8 +18,10 @@ package org.apache.drill.exec.store.pojo; import java.lang.reflect.Field; +import java.lang.reflect.Modifier; import java.sql.Timestamp; import java.util.Iterator; +import java.util.List; import org.apache.drill.common.exceptions.ExecutionSetupException; import org.apache.drill.exec.exception.SchemaChangeException; @@ -35,8 +37,12 @@ import org.apache.drill.exec.store.pojo.Writers.NBigIntWriter; import org.apache.drill.exec.store.pojo.Writers.NBooleanWriter; import org.apache.drill.exec.store.pojo.Writers.NDoubleWriter; import org.apache.drill.exec.store.pojo.Writers.NIntWriter; -import org.apache.drill.exec.store.pojo.Writers.StringWriter; import org.apache.drill.exec.store.pojo.Writers.NTimeStampWriter; +import org.apache.drill.exec.store.pojo.Writers.StringWriter; + +import com.google.common.collect.Lists; + + public class PojoRecordReader<T> implements RecordReader{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(PojoRecordReader.class); @@ -67,42 +73,51 @@ public class PojoRecordReader<T> implements RecordReader{ public void setup(OutputMutator output) throws ExecutionSetupException { try{ Field[] fields = pojoClass.getDeclaredFields(); - writers = new PojoWriter[fields.length]; - for(int i = 0; i < writers.length; i++){ + List<PojoWriter> writers = Lists.newArrayList(); + + for(int i = 0; i < fields.length; i++){ Field f = fields[i]; - Class<?> type = f.getType(); + if(Modifier.isStatic(f.getModifiers())) continue; + + Class<?> type = f.getType(); + PojoWriter w = null; if(type == int.class){ - writers[i] = new IntWriter(f); + w = new IntWriter(f); }else if(type == Integer.class){ - writers[i] = new NIntWriter(f); + w = new NIntWriter(f); }else if(type == Long.class){ - writers[i] = new NBigIntWriter(f); + w = new NBigIntWriter(f); }else if(type == Boolean.class){ - writers[i] = new NBooleanWriter(f); + w = new NBooleanWriter(f); }else if(type == double.class){ - writers[i] = new DoubleWriter(f); + w = new DoubleWriter(f); }else if(type == Double.class){ - writers[i] = new NDoubleWriter(f); + w = new NDoubleWriter(f); }else if(type.isEnum()){ - writers[i] = new EnumWriter(f); + w = new EnumWriter(f, output.getManagedBuffer()); }else if(type == boolean.class){ - writers[i] = new BitWriter(f); + w = new BitWriter(f); }else if(type == long.class){ - writers[i] = new LongWriter(f); + w = new LongWriter(f); }else if(type == String.class){ - writers[i] = new StringWriter(f); + w = new StringWriter(f, output.getManagedBuffer()); }else if (type == Timestamp.class) { - writers[i] = new NTimeStampWriter(f); + w = new NTimeStampWriter(f); }else{ throw new ExecutionSetupException(String.format("PojoRecord reader doesn't yet support conversions from type [%s].", type)); } - writers[i].init(output); + writers.add(w); + w.init(output); } + + this.writers = writers.toArray(new PojoWriter[writers.size()]); + }catch(SchemaChangeException e){ throw new ExecutionSetupException("Failure while setting up schema for PojoRecordReader.", e); } + } private void allocate(){ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java index 03732a0..8227695 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/pojo/Writers.java @@ -17,8 +17,7 @@ */ package org.apache.drill.exec.store.pojo; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.UnpooledByteBufAllocator; +import io.netty.buffer.DrillBuf; import java.lang.reflect.Field; import java.sql.Timestamp; @@ -34,8 +33,8 @@ import org.apache.drill.exec.vector.NullableBigIntVector; import org.apache.drill.exec.vector.NullableBitVector; import org.apache.drill.exec.vector.NullableFloat8Vector; import org.apache.drill.exec.vector.NullableIntVector; -import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.NullableTimeStampVector; +import org.apache.drill.exec.vector.NullableVarCharVector; import com.google.common.base.Charsets; @@ -105,23 +104,20 @@ public class Writers { } private abstract static class AbstractStringWriter extends AbstractWriter<NullableVarCharVector>{ - private ByteBuf data; + private DrillBuf data; private final NullableVarCharHolder h = new NullableVarCharHolder(); - public AbstractStringWriter(Field field) { + public AbstractStringWriter(Field field, DrillBuf managedBuf) { super(field, Types.optional(MinorType.VARCHAR)); + this.data = managedBuf; ensureLength(100); } void ensureLength(int len){ - if(data == null || data.capacity() < len){ - if(data != null) data.release(); - data = UnpooledByteBufAllocator.DEFAULT.buffer(len); - } + data = data.reallocIfNeeded(len); } public void cleanup(){ - data.release(); } public boolean writeString(String s, int outboundIndex) throws IllegalArgumentException, IllegalAccessException { @@ -145,8 +141,8 @@ public class Writers { } public static class EnumWriter extends AbstractStringWriter{ - public EnumWriter(Field field) { - super(field); + public EnumWriter(Field field, DrillBuf managedBuf) { + super(field, managedBuf); if(!field.getType().isEnum()) throw new IllegalStateException(); } @@ -159,8 +155,8 @@ public class Writers { } public static class StringWriter extends AbstractStringWriter { - public StringWriter(Field field) { - super(field); + public StringWriter(Field field, DrillBuf managedBuf) { + super(field, managedBuf); if(field.getType() != String.class) throw new IllegalStateException(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java index e7bdbcf..37e6040 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/util/ByteBufUtil.java @@ -17,42 +17,19 @@ */ package org.apache.drill.exec.util; -import static java.nio.ByteOrder.LITTLE_ENDIAN; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; +import io.netty.buffer.DrillBuf; import java.io.DataInput; import org.apache.drill.common.util.DrillStringUtils; public class ByteBufUtil { - /** - * Creates a wrapped {@link ByteBuf} of {@code size} number of bytes with - * the Drill's default endianness(LITTLE_ENDIAN). - * - * @param size - * @return - */ - public static ByteBuf createBuffer(int size) { - return Unpooled.wrappedBuffer(new byte[size]).order(LITTLE_ENDIAN); - } - - /** - * Creates a {@link ByteBuf} which wraps the provided byte array with - * the Drill's default endianness(LITTLE_ENDIAN). - * - * @param size - * @return - */ - public static ByteBuf createBuffer(byte[] source) { - return Unpooled.wrappedBuffer(source).order(LITTLE_ENDIAN); - } /** * Verifies that the the space provided in the buffer is of specified size. * @throws IllegalArgumentException if the specified boundaries do not describe the expected size. */ - public static void checkBufferLength(ByteBuf buffer, int start, int end, int requiredLen) { + public static void checkBufferLength(DrillBuf buffer, int start, int end, int requiredLen) { int actualLen = (end - start); if (actualLen != requiredLen) { throw new IllegalArgumentException(String.format("Wrong length %d(%d-%d) in the buffer '%s', expected %d.", @@ -62,7 +39,7 @@ public class ByteBufUtil { /** * Modeled after {@code org.apache.hadoop.io.WritableUtils}. - * We copy the code to avoid wrapping {@link ByteBuf} to/from {@link DataInput}. + * We copy the code to avoid wrapping {@link DrillBuf} to/from {@link DataInput}. */ public static class HadoopWritables { /** @@ -76,10 +53,10 @@ public class ByteBufUtil { * is negative, with number of bytes that follow are -(v+124). Bytes are * stored in the high-non-zero-byte-first order. * - * @param buffer ByteBuf to read from + * @param buffer DrillBuf to read from * @param i Integer to be serialized */ - public static void writeVInt(ByteBuf buffer, int start, int end, int i) { + public static void writeVInt(DrillBuf buffer, int start, int end, int i) { writeVLong(buffer, start, end, i); } @@ -94,10 +71,10 @@ public class ByteBufUtil { * is negative, with number of bytes that follow are -(v+120). Bytes are * stored in the high-non-zero-byte-first order. * - * @param buffer ByteBuf to write to + * @param buffer DrillBuf to write to * @param i Long to be serialized */ - public static void writeVLong(ByteBuf buffer, int start, int end, long i) { + public static void writeVLong(DrillBuf buffer, int start, int end, long i) { int availableBytes = (end-start); if (availableBytes < getVIntSize(i)) { throw new NumberFormatException("Expected " + getVIntSize(i) + " bytes but the buffer '" @@ -136,10 +113,10 @@ public class ByteBufUtil { /** * Reads a zero-compressed encoded integer from input stream and returns it. - * @param buffer ByteBuf to read from + * @param buffer DrillBuf to read from * @return deserialized integer from stream. */ - public static int readVInt(ByteBuf buffer, int start, int end) { + public static int readVInt(DrillBuf buffer, int start, int end) { long n = readVLong(buffer, start, end); if ((n > Integer.MAX_VALUE) || (n < Integer.MIN_VALUE)) { throw new NumberFormatException("Value " + n + " too long to fit in integer"); @@ -149,10 +126,10 @@ public class ByteBufUtil { /** * Reads a zero-compressed encoded long from input stream and returns it. - * @param buffer ByteBuf to read from + * @param buffer DrillBuf to read from * @return deserialized long from stream. */ - public static long readVLong(ByteBuf buffer, int start, int end) { + public static long readVLong(DrillBuf buffer, int start, int end) { buffer.readerIndex(start); byte firstByte = buffer.readByte(); int len = decodeVIntSize(firstByte);
