DRILL-1270: Resize buffers used in workspace vectors for aggregate functions dynamically
Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/450d891e Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/450d891e Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/450d891e Branch: refs/heads/master Commit: 450d891eb9cc9a77e537537e5d70ecb94de697b8 Parents: c6904f9 Author: Mehant Baid <[email protected]> Authored: Wed Aug 6 15:20:25 2014 -0700 Committer: Jacques Nadeau <[email protected]> Committed: Wed Aug 6 16:44:22 2014 -0700 ---------------------------------------------------------------------- .../src/main/codegen/data/AggrTypes1.tdd | 16 ++--- .../templates/VarCharAggrFunctions1.java | 70 +++++++++++--------- 2 files changed, 46 insertions(+), 40 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/450d891e/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd index d76d913..1bac07e 100644 --- a/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd +++ b/exec/java-exec/src/main/codegen/data/AggrTypes1.tdd @@ -41,10 +41,10 @@ {inputType: "NullableIntervalYear", outputType: "IntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "Integer.MAX_VALUE"}, {inputType: "Interval", outputType: "Interval", runningType: "BigInt", major: "Date", initialValue: "Long.MAX_VALUE"}, {inputType: "NullableInterval", outputType: "Interval", runningType: "BigInt", major: "Date", initialValue: "Long.MAX_VALUE"}, - {inputType: "VarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes", initialValue: "0xFF", bufferEnd: "65536"}, - {inputType: "NullableVarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes", initialValue: "0xFF",bufferEnd: "65536"}, - {inputType: "VarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes", initialValue: "0xFF", bufferEnd: "65536"}, - {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes", initialValue: "0xFF", bufferEnd: "65536"} + {inputType: "VarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes"}, + {inputType: "NullableVarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes"}, + {inputType: "VarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"}, + {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"} ] }, {className: "Max", funcName: "max", types: [ @@ -72,10 +72,10 @@ {inputType: "NullableIntervalYear", outputType: "IntervalYear", runningType: "IntervalYear", major: "Date", initialValue: "Integer.MIN_VALUE"}, {inputType: "Interval", outputType: "Interval", runningType: "BigInt", major: "Date", initialValue: "Long.MIN_VALUE"}, {inputType: "NullableInterval", outputType: "Interval", runningType: "BigInt", major: "Date", initialValue: "Long.MIN_VALUE"}, - {inputType: "VarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes", initialValue: "0x00", bufferEnd: "0"}, - {inputType: "NullableVarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes", initialValue: "0x00", bufferEnd: "0"}, - {inputType: "VarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes", initialValue: "0x00", bufferEnd: "0"}, - {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes", initialValue: "0x00", bufferEnd: "0"} + {inputType: "VarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes"}, + {inputType: "NullableVarChar", outputType: "VarChar", runningType: "VarChar", major: "VarBytes"}, + {inputType: "VarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"}, + {inputType: "NullableVarBinary", outputType: "VarBinary", runningType: "VarBinary", major: "VarBytes"} ] }, {className: "Sum", funcName: "sum", types: [ http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/450d891e/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java index 08f5732..746d25b 100644 --- a/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java +++ b/exec/java-exec/src/main/codegen/templates/VarCharAggrFunctions1.java @@ -67,12 +67,8 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu value = new ObjectHolder(); ${type.runningType}Holder tmp = new ${type.runningType}Holder(); tmp.start = 0; - tmp.end = ${type.bufferEnd}; - io.netty.buffer.ByteBuf buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [65536]); - for (int i = 0; i < 65536; i++) { - buffer.setByte(i, ${type.initialValue}); - } - tmp.buffer = buffer; + tmp.end = 0; + tmp.buffer = null; value.obj = tmp; <#else> @@ -95,33 +91,44 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu int cmp = 0; boolean swap = false; - // Compare the bytes - for (int l = in.start, r = tmp.start; l < in.end && r < tmp.end; l++, r++) { - byte leftByte = in.buffer.getByte(l); - byte rightByte = tmp.buffer.getByte(r); - if (leftByte != rightByte) { - cmp = ((leftByte & 0xFF) - (rightByte & 0xFF)) > 0 ? 1 : -1; - break; + // if buffer is null then swap + if (tmp.buffer == null) { + swap = true; + } else { + // Compare the bytes + for (int l = in.start, r = tmp.start; l < in.end && r < tmp.end; l++, r++) { + byte leftByte = in.buffer.getByte(l); + byte rightByte = tmp.buffer.getByte(r); + if (leftByte != rightByte) { + cmp = ((leftByte & 0xFF) - (rightByte & 0xFF)) > 0 ? 1 : -1; + break; + } } - } - if (cmp == 0) { - int l = (in.end - in.start) - (tmp.end - tmp.start); - if (l > 0) { - cmp = 1; - } else { - cmp = -1; + if (cmp == 0) { + int l = (in.end - in.start) - (tmp.end - tmp.start); + if (l > 0) { + cmp = 1; + } else { + cmp = -1; + } } - } - - <#if aggrtype.className == "Min"> - swap = (cmp == -1); - <#elseif aggrtype.className == "Max"> - swap = (cmp == 1); - </#if> + <#if aggrtype.className == "Min"> + swap = (cmp == -1); + <#elseif aggrtype.className == "Max"> + swap = (cmp == 1); + </#if> + } if (swap) { int length = in.end - in.start; + if (length > (tmp.end - tmp.start)) { + // if workspace buffer is smaller, release and allocate a new one + if (tmp.buffer != null) { + tmp.buffer.release(); + } + tmp.buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [length]); + } in.buffer.getBytes(in.start, tmp.buffer, 0, length); tmp.end = length; } @@ -151,12 +158,11 @@ public static class ${type.inputType}${aggrtype.className} implements DrillAggFu value = new ObjectHolder(); ${type.runningType}Holder tmp = new ${type.runningType}Holder(); tmp.start = 0; - tmp.end = ${type.bufferEnd}; - io.netty.buffer.ByteBuf buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [65536]); - for (int i = 0; i < 65536; i++) { - buffer.setByte(i, ${type.initialValue}); + tmp.end = 0; + if (tmp.buffer != null) { + tmp.buffer.release(); + tmp.buffer = null; } - tmp.buffer = buffer; value.obj = tmp; <#else> value = new ${type.runningType}Holder();
