DRILL-969: Improvements to ValueVector allocations
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/2712c3c3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/2712c3c3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/2712c3c3 Branch: refs/heads/master Commit: 2712c3c35f8fc1e5bd7443c7e40d6b76fad9d49d Parents: 734f9a8 Author: Steven Phillips <[email protected]> Authored: Mon Jun 9 16:39:56 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 11 21:24:55 2014 -0700 ---------------------------------------------------------------------- .../codegen/templates/FixedValueVectors.java | 15 ++++--- .../codegen/templates/RepeatedValueVectors.java | 9 ++-- .../templates/VariableLengthVectors.java | 23 +++++----- .../impl/svremover/RemovingRecordBatch.java | 46 +++++++++++++++----- .../exec/record/AbstractSingleRecordBatch.java | 7 +++ .../exec/store/text/DrillTextRecordReader.java | 13 ++++-- 6 files changed, 78 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index af31f64..a83ec97 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -53,7 +53,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); - private int allocationValueCount = 4000; + private int allocationValueCount = 4096; private int allocationMonitor = 0; public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { @@ -81,11 +81,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean allocateNewSafe() { clear(); - if (allocationMonitor > 5) { - allocationValueCount = Math.max(2, (int) (allocationValueCount * 0.9)); + if (allocationMonitor > 10) { + allocationValueCount = Math.max(8, (int) (allocationValueCount / 2)); allocationMonitor = 0; - } else if (allocationMonitor < -5) { - allocationValueCount = (int) (allocationValueCount * 1.1); + } else if (allocationMonitor < -2) { + allocationValueCount = (int) (allocationValueCount * 2); allocationMonitor = 0; } this.data = allocator.buffer(allocationValueCount * ${type.width}); @@ -102,6 +102,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F clear(); this.data = allocator.buffer(valueCount * ${type.width}); this.data.readerIndex(0); + this.allocationValueCount = valueCount; } /** @@ -815,8 +816,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F int currentValueCapacity = getValueCapacity(); ${minor.class}Vector.this.valueCount = valueCount; int idx = (${type.width} * valueCount); - if (((float) currentValueCapacity) / idx > 1.1) { + if (valueCount > 0 && currentValueCapacity > idx * 2) { allocationMonitor++; + } else if (allocationMonitor > 0) { + allocationMonitor--; } data.writerIndex(idx); if (data instanceof AccountingByteBuf) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java index 48efc16..7bf84f2 100644 --- a/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/RepeatedValueVectors.java @@ -98,6 +98,8 @@ package org.apache.drill.exec.vector; int endValue = offsets.getAccessor().get(startIndex + length); values.splitAndTransferTo(startValue, endValue - startValue, target.values); offsets.splitAndTransferTo(startIndex, length, target.offsets); + target.parentValueCount = parentValueCount; + target.childValueCount = childValueCount; sliceOffset = startIndex; } @@ -369,8 +371,7 @@ package org.apache.drill.exec.vector; if(getValueCapacity() <= index){ return false; } - offsets.getMutator().set(index+1, offsets.getAccessor().get(index)); - return true; + return offsets.getMutator().setSafe(index+1, offsets.getAccessor().get(index)); } /** @@ -392,7 +393,9 @@ package org.apache.drill.exec.vector; } public boolean addSafe(int index, byte[] bytes, int start, int length) { - if(offsets.getValueCapacity() <= index+1) return false; + if(offsets.getValueCapacity() <= index+1) { + return false; + } int nextOffset = offsets.getAccessor().get(index+1); boolean b1 = values.getMutator().setSafe(nextOffset, bytes, start, length); boolean b2 = offsets.getMutator().setSafe(index+1, nextOffset+1); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 6a2dfd3..22a668d 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -49,7 +49,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); - private int allocationTotalByteCount = 40000; + private int allocationTotalByteCount = 32768; private int allocationMonitor = 0; public ${minor.class}Vector(MaterializedField field, BufferAllocator allocator) { @@ -229,11 +229,11 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V @Override public boolean allocateNewSafe() { clear(); - if (allocationMonitor > 5) { - allocationTotalByteCount = Math.max(1, (int) (allocationTotalByteCount * 0.9)); + if (allocationMonitor > 10) { + allocationTotalByteCount = Math.max(8, (int) (allocationTotalByteCount / 2)); allocationMonitor = 0; - } else if (allocationMonitor < -5) { - allocationTotalByteCount = (int) (allocationTotalByteCount * 1.1); + } else if (allocationMonitor < -2) { + allocationTotalByteCount = (int) (allocationTotalByteCount * 2); allocationMonitor = 0; } data = allocator.buffer(allocationTotalByteCount); @@ -254,6 +254,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V assert totalBytes >= 0; data = allocator.buffer(totalBytes); data.readerIndex(0); + allocationTotalByteCount = totalBytes; offsetVector.allocateNew(valueCount+1); offsetVector.zeroVector(); } @@ -359,7 +360,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public boolean setSafe(int index, byte[] bytes) { assert index >= 0; - if(index >= getValueCapacity()) return false; int currentOffset = offsetVector.getAccessor().get(index); if (data.capacity() < currentOffset + bytes.length) { @@ -391,7 +391,6 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public boolean setSafe(int index, byte[] bytes, int start, int length) { assert index >= 0; - if(index >= getValueCapacity()) return false; int currentOffset = offsetVector.getAccessor().get(index); @@ -409,8 +408,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V public boolean setSafe(int index, Nullable${minor.class}Holder holder){ assert holder.isSet == 1; - if(index >= getValueCapacity()) return false; - + int start = holder.start; int end = holder.end; int len = end - start; @@ -433,8 +431,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } public boolean setSafe(int index, ${minor.class}Holder holder){ - if(index >= getValueCapacity()) return false; - + int start = holder.start; int end = holder.end; int len = end - start; @@ -483,8 +480,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V ${minor.class}Vector.this.valueCount = valueCount; int idx = offsetVector.getAccessor().get(valueCount); data.writerIndex(idx); - if (((float) currentByteCapacity) / idx > 1.1) { + if (valueCount > 0 && currentByteCapacity > idx * 2) { allocationMonitor++; + } else if (allocationMonitor > 0) { + allocationMonitor--; } if (data instanceof AccountingByteBuf) { data.capacity(idx); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java index f3388dc..77582c3 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/svremover/RemovingRecordBatch.java @@ -49,6 +49,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect private int recordCount; private boolean hasRemainder; private int remainderIndex; + private boolean first; public RemovingRecordBatch(SelectionVectorRemover popConfig, FragmentContext context, RecordBatch incoming) throws OutOfMemoryException { super(popConfig, context, incoming); @@ -93,9 +94,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect @Override protected void doWork() { - recordCount = incoming.getRecordCount(); - int copiedRecords = copier.copyRecords(0, recordCount); - if (copiedRecords < recordCount) { + int incomingRecordCount = incoming.getRecordCount(); + int copiedRecords = copier.copyRecords(0, incomingRecordCount); + + if (copiedRecords < incomingRecordCount) { for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(copiedRecords); @@ -104,6 +106,7 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect remainderIndex = copiedRecords; this.recordCount = remainderIndex; } else { + recordCount = copiedRecords; for(VectorWrapper<?> v : container){ ValueVector.Mutator m = v.getValueVector().getMutator(); m.setValueCount(recordCount); @@ -118,16 +121,37 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect } } - logger.debug(String.format("doWork(): %s records copied for out of %s, remaining: %s, incoming schema %s ", + assert recordCount >= copiedRecords; + logger.debug("doWork(): {} records copied out of {}, remaining: {}, incoming schema {} ", copiedRecords, - incoming.getRecordCount(), - incoming.getRecordCount() - remainderIndex, - incoming.getSchema())); + incomingRecordCount, + incomingRecordCount - remainderIndex, + incoming.getSchema()); } private void handleRemainder() { + int recordCount = incoming.getRecordCount(); int remainingRecordCount = incoming.getRecordCount() - remainderIndex; - int copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount); + int copiedRecords; + while((copiedRecords = copier.copyRecords(remainderIndex, remainingRecordCount)) == 0) { + logger.debug("Copied zero records. Retrying"); + container.zeroVectors(); + } + + /* + StringBuilder builder = new StringBuilder(); + for (VectorWrapper w : container) { + builder.append(w.getField().getPath()); + builder.append(" Value capacity: "); + builder.append(w.getValueVector().getValueCapacity()); + if (w.getValueVector() instanceof VariableWidthVector) { + builder.append(" Byte capacity: "); + builder.append(((VariableWidthVector) w.getValueVector()).getByteCapacity()); + builder.append("\n"); + } + } + logger.debug(builder.toString()); + */ if (copiedRecords < remainingRecordCount) { for(VectorWrapper<?> v : container){ @@ -150,10 +174,10 @@ public class RemovingRecordBatch extends AbstractSingleRecordBatch<SelectionVect remainderIndex = 0; hasRemainder = false; } - if(logger.isDebugEnabled()) logger.debug(String.format("handleRemainder(): %s records copied for out of %s, remaining: %s, incoming schema %s ", + logger.debug(String.format("handleRemainder(): %s records copied out of %s, remaining: %s, incoming schema %s ", copiedRecords, - incoming.getRecordCount(), - incoming.getRecordCount() - remainderIndex, + recordCount, + recordCount - remainderIndex, incoming.getSchema())); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java index c5fdaeb..9473945 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/record/AbstractSingleRecordBatch.java @@ -44,6 +44,13 @@ public abstract class AbstractSingleRecordBatch<T extends PhysicalOperator> exte public IterOutcome innerNext() { IterOutcome upstream = next(incoming); if(first && upstream == IterOutcome.OK) upstream = IterOutcome.OK_NEW_SCHEMA; + if (!first && upstream == IterOutcome.OK && incoming.getRecordCount() == 0) { + do { + for (VectorWrapper w : incoming) { + w.clear(); + } + } while ((upstream = next(incoming)) == IterOutcome.OK && incoming.getRecordCount() == 0); + } first = false; switch(upstream){ case NONE: http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/2712c3c3/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java index 5c3d381..20f458f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/store/text/DrillTextRecordReader.java @@ -63,6 +63,7 @@ public class DrillTextRecordReader implements RecordReader { private Text value; private int numCols = 0; private boolean redoRecord = false; + private boolean first = true; public DrillTextRecordReader(FileSplit split, FragmentContext context, char delimiter, List<SchemaPath> columns) { this.context = context; @@ -106,10 +107,12 @@ public class DrillTextRecordReader implements RecordReader { @Override public int next() { - AllocationHelper.allocate(vector, targetRecordCount, 50); + logger.debug("vector value capacity {}", vector.getValueCapacity()); + logger.debug("vector byte capacity {}", vector.getByteCapacity()); + int batchSize = 0; try { int recordCount = 0; - while (redoRecord || (recordCount < targetRecordCount && reader.next(key, value))) { + while (redoRecord || (batchSize < 200*1000 && reader.next(key, value))) { redoRecord = false; int start; int end = -1; @@ -126,9 +129,10 @@ public class DrillTextRecordReader implements RecordReader { end = value.getLength(); } if (numCols > 0 && i++ < columnIds.get(p)) { - if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, start + 1)) { + if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, 0)) { redoRecord = true; vector.getMutator().setValueCount(recordCount); + logger.debug("text scan batch size {}", batchSize); return recordCount; } continue; @@ -137,8 +141,10 @@ public class DrillTextRecordReader implements RecordReader { if (!vector.getMutator().addSafe(recordCount, value.getBytes(), start + 1, end - start - 1)) { redoRecord = true; vector.getMutator().setValueCount(recordCount); + logger.debug("text scan batch size {}", batchSize); return recordCount; } + batchSize += end - start; } recordCount++; } @@ -146,6 +152,7 @@ public class DrillTextRecordReader implements RecordReader { v.getMutator().setValueCount(recordCount); } vector.getMutator().setValueCount(recordCount); + logger.debug("text scan batch size {}", batchSize); return recordCount; } catch (IOException e) { cleanup();
