DRILL-1020: Fix bug in dynamic allocation
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/c7712f80 Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/c7712f80 Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/c7712f80 Branch: refs/heads/master Commit: c7712f8053a7bcf5028028db68a4a5580c442a00 Parents: c373a27 Author: Steven Phillips <[email protected]> Authored: Tue Jun 17 03:14:04 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Jun 18 20:28:25 2014 -0700 ---------------------------------------------------------------------- .../codegen/templates/FixedValueVectors.java | 45 ++++++++----- .../codegen/templates/NullableValueVectors.java | 4 +- .../templates/VariableLengthVectors.java | 38 +++++++---- .../org/apache/drill/exec/vector/BitVector.java | 45 +++++++++---- .../exec/vector/TestAdaptiveAllocation.java | 67 ++++++++++++++++---- 5 files changed, 144 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7712f80/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java index a83ec97..7ff7327 100644 --- a/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/FixedValueVectors.java @@ -213,13 +213,24 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){ if(thisIndex >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } copyFrom(fromIndex, thisIndex, from); return true; } + private void decrementAllocationMonitor() { + if (allocationMonitor > 0) { + allocationMonitor = 0; + } + --allocationMonitor; + } + + private void incrementAllocationMonitor() { + ++allocationMonitor; + } + public final class Accessor extends BaseValueVector.BaseAccessor{ final FieldReader reader = new ${minor.class}ReaderImpl(${minor.class}Vector.this); @@ -576,7 +587,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, <#if (type.width > 4)>${minor.javaType!type.javaType}<#else>int</#if> value) { if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } data.setBytes(index * ${type.width}, value, 0, ${type.width}); @@ -597,7 +608,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -606,7 +617,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, Nullable${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -629,7 +640,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -638,7 +649,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, Nullable${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -659,7 +670,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -668,7 +679,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, Nullable${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -708,7 +719,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, Nullable${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -717,7 +728,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -731,7 +742,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -762,7 +773,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, <#if (type.width >= 4)>${minor.javaType!type.javaType}<#else>int</#if> value) { if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, value); @@ -775,7 +786,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, ${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -788,7 +799,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F public boolean setSafe(int index, Nullable${minor.class}Holder holder){ if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, holder); @@ -816,10 +827,10 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements F int currentValueCapacity = getValueCapacity(); ${minor.class}Vector.this.valueCount = valueCount; int idx = (${type.width} * valueCount); - if (valueCount > 0 && currentValueCapacity > idx * 2) { - allocationMonitor++; + if (valueCount > 0 && currentValueCapacity > valueCount * 2) { + incrementAllocationMonitor(); } else if (allocationMonitor > 0) { - allocationMonitor--; + allocationMonitor = 0; } data.writerIndex(idx); if (data instanceof AccountingByteBuf) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7712f80/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java index ce17418..f50aae8 100644 --- a/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java +++ b/exec/java-exec/src/main/codegen/templates/NullableValueVectors.java @@ -303,7 +303,9 @@ public final class ${className} extends BaseValueVector implements <#if type.maj <#if type.major == "VarLen"> if(!mutator.fillEmpties(thisIndex)) return false; </#if> - return bits.copyFromSafe(fromIndex, thisIndex, from.bits) && values.copyFromSafe(fromIndex, thisIndex, from.values); + boolean b1 = bits.copyFromSafe(fromIndex, thisIndex, from.bits); + boolean b2 = values.copyFromSafe(fromIndex, thisIndex, from.values); + return b1 && b2; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7712f80/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java index 22a668d..8535f99 100644 --- a/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java +++ b/exec/java-exec/src/main/codegen/templates/VariableLengthVectors.java @@ -168,11 +168,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V } public boolean copyFromSafe(int fromIndex, int thisIndex, ${minor.class}Vector from){ - if(thisIndex >= getValueCapacity()) { - allocationMonitor--; - return false; - } - + int start = from.offsetVector.getAccessor().get(fromIndex); int end = from.offsetVector.getAccessor().get(fromIndex+1); int len = end - start; @@ -180,10 +176,15 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(thisIndex * ${type.width}); if(data.capacity() < outputStart + len) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } - + + if (!offsetVector.getMutator().setSafe(thisIndex + 1, outputStart + len)) { + decrementAllocationMonitor(); + return false; + } + from.data.getBytes(start, data, outputStart, len); offsetVector.data.set${(minor.javaType!type.javaType)?cap_first}( (thisIndex+1) * ${type.width}, outputStart + len); @@ -259,6 +260,17 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V offsetVector.zeroVector(); } + private void decrementAllocationMonitor() { + if (allocationMonitor > 0) { + allocationMonitor = 0; + } + --allocationMonitor; + } + + private void incrementAllocationMonitor() { + ++allocationMonitor; + } + public Accessor getAccessor(){ return accessor; } @@ -363,7 +375,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int currentOffset = offsetVector.getAccessor().get(index); if (data.capacity() < currentOffset + bytes.length) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + bytes.length)) { @@ -395,7 +407,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int currentOffset = offsetVector.getAccessor().get(index); if (data.capacity() < currentOffset + length) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } if (!offsetVector.getMutator().setSafe(index + 1, currentOffset + length)) { @@ -416,7 +428,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}); if(data.capacity() < outputStart + len) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } @@ -439,7 +451,7 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int outputStart = offsetVector.data.get${(minor.javaType!type.javaType)?cap_first}(index * ${type.width}); if(data.capacity() < outputStart + len) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } @@ -481,9 +493,9 @@ public final class ${minor.class}Vector extends BaseDataValueVector implements V int idx = offsetVector.getAccessor().get(valueCount); data.writerIndex(idx); if (valueCount > 0 && currentByteCapacity > idx * 2) { - allocationMonitor++; + incrementAllocationMonitor(); } else if (allocationMonitor > 0) { - allocationMonitor--; + allocationMonitor = 0; } if (data instanceof AccountingByteBuf) { data.capacity(idx); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7712f80/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java index e217ddb..73f97fe 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/BitVector.java @@ -42,7 +42,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe private final Accessor accessor = new Accessor(); private final Mutator mutator = new Mutator(); - private int allocationValueCount = 4000; + private int allocationValueCount = 4096; private int allocationMonitor = 0; private int valueCapacity; @@ -73,11 +73,11 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe public boolean allocateNewSafe() { clear(); - if (allocationMonitor > 5) { - allocationValueCount = Math.max(1, (int)(allocationValueCount * 0.9)); + if (allocationMonitor > 10) { + allocationValueCount = Math.max(8, (int) (allocationValueCount / 2)); allocationMonitor = 0; - } else if (allocationMonitor < -5) { - allocationValueCount = (int) (allocationValueCount * 1.1); + } else if (allocationMonitor < -2) { + allocationValueCount = (int) (allocationValueCount * 2); allocationMonitor = 0; } @@ -127,7 +127,10 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } public boolean copyFromSafe(int inIndex, int outIndex, BitVector from){ - if(outIndex >= this.getValueCapacity()) return false; + if(outIndex >= this.getValueCapacity()) { + decrementAllocationMonitor(); + return false; + } copyFrom(inIndex, outIndex, from); return true; } @@ -231,6 +234,17 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } } + private void decrementAllocationMonitor() { + if (allocationMonitor > 0) { + allocationMonitor = 0; + } + --allocationMonitor; + } + + private void incrementAllocationMonitor() { + ++allocationMonitor; + } + public class Accessor extends BaseAccessor { /** @@ -318,7 +332,7 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe public boolean setSafe(int index, int value) { if(index >= getValueCapacity()) { - allocationMonitor--; + decrementAllocationMonitor(); return false; } set(index, value); @@ -326,22 +340,31 @@ public final class BitVector extends BaseDataValueVector implements FixedWidthVe } public boolean setSafe(int index, BitHolder holder) { - if(index >= getValueCapacity()) return false; + if(index >= getValueCapacity()) { + decrementAllocationMonitor(); + return false; + } set(index, holder.value); return true; } public boolean setSafe(int index, NullableBitHolder holder) { - if(index >= getValueCapacity()) return false; + if(index >= getValueCapacity()) { + decrementAllocationMonitor(); + return false; + } set(index, holder.value); return true; } public final void setValueCount(int valueCount) { + int currentValueCapacity = getValueCapacity(); BitVector.this.valueCount = valueCount; int idx = getSizeFromCount(valueCount); - if (((float) data.capacity()) / idx > 1.1) { - allocationMonitor++; + if (valueCount > 0 && currentValueCapacity > valueCount * 2) { + incrementAllocationMonitor(); + } else if (allocationMonitor > 0) { + allocationMonitor = 0; } data.writerIndex(idx); if (data instanceof AccountingByteBuf) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/c7712f80/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java index d86b5db..f554e3f 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/TestAdaptiveAllocation.java @@ -29,43 +29,84 @@ import org.junit.Test; import java.util.Random; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + public class TestAdaptiveAllocation { @Test public void test() throws Exception { BufferAllocator allocator = new TopLevelAllocator(); MaterializedField field = MaterializedField.create("field", Types.required(MinorType.VARCHAR)); - VarBinaryVector varBinaryVector = new VarBinaryVector(field, allocator); + NullableVarBinaryVector vector1 = new NullableVarBinaryVector(field, allocator); + NullableVarCharVector vector2 = new NullableVarCharVector(field, allocator); + NullableBigIntVector vector3 = new NullableBigIntVector(field, allocator); Random rand = new Random(); // int valuesToWrite = rand.nextInt(4000) + 1000; // int bytesToWrite = rand.nextInt(100); - int valuesToWrite = 100; - int bytesToWrite = 1; + int valuesToWrite = 8000; + int bytesToWrite1 = 2; + int bytesToWrite2 = 200; // System.out.println("value: " + valuesToWrite); // System.out.println("bytes: " + bytesToWrite); - byte[] value = new byte[bytesToWrite]; + byte[] value1 = new byte[bytesToWrite1]; + byte[] value2 = new byte[bytesToWrite2]; + + NullableVarBinaryVector copyVector1 = new NullableVarBinaryVector(field, allocator); + NullableVarCharVector copyVector2 = new NullableVarCharVector(field, allocator); + NullableBigIntVector copyVector3 = new NullableBigIntVector(field, allocator); + + copyVector1.allocateNew(); + copyVector2.allocateNew(); + copyVector3.allocateNew(); + + copyVector1.getMutator().set(0, value1); + copyVector2.getMutator().set(0, value2); + copyVector3.getMutator().set(0, 100); for (int i = 0; i < 10000; i++) { - varBinaryVector.allocateNew(); + vector1.allocateNew(); + vector2.allocateNew(); + vector3.allocateNew(); // System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity()); // System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity()); int offset = 0; int j = 0; - for (j = 0; j < valuesToWrite; j++) { - if (!varBinaryVector.getMutator().setSafe(j - offset, value)) { - varBinaryVector.getMutator().setValueCount(j - offset); + int toWrite = (int) valuesToWrite * (int) (2 + rand.nextGaussian()) / 2; + for (j = 0; j < toWrite; j += 1) { +// if (!(vector1.getMutator().setSafe(j - offset, value1, 0, value1.length) && +// vector2.getMutator().setSafe(j - offset, value2, 0 , value2.length) && +// vector3.getMutator().setSafe(j - offset, 100))) { + if (!(vector1.copyFromSafe(0, j - offset, copyVector1) && + vector2.copyFromSafe(0, j - offset, copyVector2) && + vector3.copyFromSafe(0, j - offset, copyVector3))) { + vector1.getMutator().setValueCount(j - offset); + vector2.getMutator().setValueCount(j - offset); + vector3.getMutator().setValueCount(j - offset); offset = j; - varBinaryVector.allocateNew(); + vector1.clear(); + vector2.clear(); + vector3.clear(); + vector1.allocateNew(); + vector2.allocateNew(); + vector3.allocateNew(); // System.out.println("Value Capacity: " + varBinaryVector.getValueCapacity()); // System.out.println("Byte Capacity: " + varBinaryVector.getByteCapacity()); } } - varBinaryVector.getMutator().setValueCount(j - offset); + vector1.getMutator().setValueCount(j - offset); + vector2.getMutator().setValueCount(j - offset); + vector3.getMutator().setValueCount(j - offset); } - varBinaryVector.allocateNew(); - System.out.println(varBinaryVector.getValueCapacity()); - System.out.println(varBinaryVector.getByteCapacity()); + vector1.allocateNew(); + vector2.allocateNew(); + vector3.allocateNew(); + assertTrue(vector1.getValueCapacity() > 8000); + assertTrue(vector2.getValueCapacity() > 8000); + assertTrue(vector3.getValueCapacity() > 8000); + assertTrue(vector1.getByteCapacity() > 8000 * 2); + assertTrue(vector2.getByteCapacity() > 8000 * 200); } }
