http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java index fcb3b77..0fa0da2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/StringFunctions.java @@ -19,6 +19,11 @@ package org.apache.drill.exec.expr.fn.impl; import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import java.nio.charset.Charset; + +import javax.inject.Inject; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -29,12 +34,11 @@ import org.apache.drill.exec.expr.annotations.Param; import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.BitHolder; +import org.apache.drill.exec.expr.holders.IntHolder; +import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.expr.holders.VarCharHolder; -import org.apache.drill.exec.expr.holders.NullableVarCharHolder; import org.apache.drill.exec.record.RecordBatch; -import java.nio.charset.Charset; -import org.apache.drill.exec.expr.holders.IntHolder; public class StringFunctions{ static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(StringFunctions.class); @@ -51,14 +55,17 @@ public class StringFunctions{ @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output BitHolder out; - @Workspace java.util.regex.Pattern regPattern; + @Workspace java.util.regex.Matcher matcher; public void setup(RecordBatch incoming){ - regPattern = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike(pattern.toString())); + matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexLike( // + org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))).matcher(""); } public void eval(){ - out.value = regPattern.matcher(input.toString()).matches()? 1:0; + String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + matcher.reset(i); + out.value = matcher.matches()? 1:0; } } @@ -67,14 +74,17 @@ public class StringFunctions{ @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Output BitHolder out; - @Workspace java.util.regex.Pattern regPattern; + @Workspace java.util.regex.Matcher matcher; public void setup(RecordBatch incoming){ - regPattern = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexSimilar(pattern.toString())); + + matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.RegexpUtil.sqlToRegexSimilar(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer))).matcher(""); } public void eval(){ - out.value = regPattern.matcher(input.toString()).matches()? 1:0; + String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + matcher.reset(i); + out.value = matcher.matches()? 1:0; } } @@ -87,20 +97,21 @@ public class StringFunctions{ @Param VarCharHolder input; @Param(constant=true) VarCharHolder pattern; @Param VarCharHolder replacement; - @Workspace ByteBuf buffer; - @Workspace java.util.regex.Pattern regPattern; + @Inject DrillBuf buffer; + @Workspace java.util.regex.Matcher matcher; @Output VarCharHolder out; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); - regPattern = java.util.regex.Pattern.compile(pattern.toString()); + matcher = java.util.regex.Pattern.compile(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(pattern.start, pattern.end, pattern.buffer)).matcher(""); } public void eval(){ - out.buffer = buffer; - out.start = 0; - byte [] bytea = regPattern.matcher(input.toString()).replaceAll(replacement.toString()).getBytes(java.nio.charset.Charset.forName("UTF-8")); + out.start = 0; + String i = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(input.start, input.end, input.buffer); + String r = org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(replacement.start, replacement.end, replacement.buffer); + byte [] bytea = matcher.reset(i).replaceAll(r).getBytes(java.nio.charset.Charset.forName("UTF-8")); + out.buffer = buffer = buffer.reallocIfNeeded(bytea.length); out.buffer.setBytes(out.start, bytea); out.end = bytea.length; } @@ -224,14 +235,13 @@ public class StringFunctions{ @Param VarCharHolder input; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval(){ - out.buffer = buffer; + out.buffer = buffer = buffer.reallocIfNeeded(input.end- input.start); out.start = 0; out.end = input.end - input.start; @@ -256,14 +266,13 @@ public class StringFunctions{ @Param VarCharHolder input; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval() { - out.buffer = buffer; + out.buffer = buffer = buffer.reallocIfNeeded(input.end- input.start); out.start = 0; out.end = input.end - input.start; @@ -446,51 +455,16 @@ public class StringFunctions{ @Param VarCharHolder input; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval() { - out.buffer = buffer; + out.buffer = buffer = buffer.reallocIfNeeded(input.end - input.start); out.start = 0; out.end = input.end - input.start; - - // Assumes Alpha as [A-Za-z0-9] - // white space is treated as everything else. - boolean capNext = true; - for (int id = input.start; id < input.end; id++) { - byte currentByte = input.buffer.getByte(id); - - // 'A - Z' : 0x41 - 0x5A - // 'a - z' : 0x61 - 0x7A - // '0-9' : 0x30 - 0x39 - if (capNext) { // curCh is whitespace or first character of word. - if (currentByte >= 0x30 && currentByte <= 0x39) { // 0-9 - capNext = false; - } else if (currentByte >=0x41 && currentByte <= 0x5A) { // A-Z - capNext = false; - } else if (currentByte >= 0x61 && currentByte <= 0x7A) { // a-z - capNext = false; - currentByte -= 0x20; // Uppercase this character - } - // else {} whitespace - } else { // Inside of a word or white space after end of word. - if (currentByte >= 0x30 && currentByte <= 0x39) { // 0-9 - // noop - } else if (currentByte >=0x41 && currentByte <= 0x5A) { // A-Z - currentByte -= 0x20 ; // Lowercase this character - } else if (currentByte >= 0x61 && currentByte <= 0x7A) { // a-z - // noop - } else { // whitespace - capNext = true; - } - } - - out.buffer.setByte(id - input.start, currentByte) ; - } //end of for_loop - + org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.initCap(input.start, input.end, input.buffer, out.buffer); } } @@ -502,11 +476,11 @@ public class StringFunctions{ @Param VarCharHolder text; @Param VarCharHolder from; @Param VarCharHolder to; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; @Output VarCharHolder out; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); + buffer = buffer.reallocIfNeeded(8000); } public void eval(){ @@ -566,15 +540,15 @@ public class StringFunctions{ @Param VarCharHolder text; @Param BigIntHolder length; @Param VarCharHolder fill; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; @Output VarCharHolder out; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval() { + buffer = buffer.reallocIfNeeded((int) length.value*2); byte currentByte = 0; int id = 0; //get the char length of text. @@ -637,15 +611,16 @@ public class StringFunctions{ @Param VarCharHolder text; @Param BigIntHolder length; @Param VarCharHolder fill; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; @Output VarCharHolder out; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval() { + buffer = buffer.reallocIfNeeded((int) length.value*2); + byte currentByte = 0; int id = 0; //get the char length of text. @@ -771,10 +746,10 @@ public class StringFunctions{ */ @FunctionTemplate(name = "btrim", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL) public static class Btrim implements DrillSimpleFunc{ - + @Param VarCharHolder text; @Param VarCharHolder from; - + @Output VarCharHolder out; public void setup(RecordBatch incoming){ @@ -784,14 +759,14 @@ public class StringFunctions{ out.buffer = text.buffer; out.start = out.end = text.start; int bytePerChar = 0; - + //Scan from left of "text", stop until find a char not in "from" for (int id = text.start; id < text.end; id += bytePerChar) { bytePerChar = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.utf8CharLen(text.buffer, id); int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(from.buffer, from.start, from.end, text.buffer, id, id + bytePerChar); if (pos < 0) { // Found the 1st char not in "from", stop - out.start = id; + out.start = id; break; } } @@ -803,7 +778,7 @@ public class StringFunctions{ int pos = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.stringLeftMatchUTF8(from.buffer, from.start, from.end, text.buffer, id, id + bytePerChar); if (pos < 0) { // Found the 1st char not in "from", stop - out.end = id+ bytePerChar; + out.end = id+ bytePerChar; break; } } @@ -818,15 +793,14 @@ public class StringFunctions{ @Param VarCharHolder left; @Param VarCharHolder right; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval(){ - out.buffer = buffer; + out.buffer = buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; int id = 0; @@ -845,15 +819,14 @@ public class StringFunctions{ @Param VarCharHolder left; @Param NullableVarCharHolder right; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval(){ - out.buffer = buffer; + out.buffer = buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start));; out.start = out.end = 0; int id = 0; @@ -873,15 +846,14 @@ public class StringFunctions{ @Param NullableVarCharHolder left; @Param VarCharHolder right; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval(){ - out.buffer = buffer; + out.buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; int id = 0; @@ -901,15 +873,14 @@ public class StringFunctions{ @Param NullableVarCharHolder left; @Param NullableVarCharHolder right; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [8000]); } public void eval(){ - out.buffer = buffer; + out.buffer = buffer.reallocIfNeeded( (left.end - left.start) + (right.end - right.start)); out.start = out.end = 0; int id = 0; @@ -969,14 +940,14 @@ public class StringFunctions{ @Param IntHolder in; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buf; public void setup(RecordBatch incoming) { - buffer = io.netty.buffer.Unpooled.wrappedBuffer(new byte [1]); + buf = buf.reallocIfNeeded(1); } public void eval() { - out.buffer = buffer; + out.buffer = buf; out.start = out.end = 0; out.buffer.setByte(0, in.value); ++out.end; @@ -992,24 +963,22 @@ public class StringFunctions{ @Param VarCharHolder in; @Param IntHolder nTimes; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming) { } public void eval() { + final int len = in.end - in.start; int num = nTimes.value; - byte[] bytea = new byte [(in.end - in.start)*num]; - int index = 0; - while(num > 0){ - for (int id = in.start; id < in.end; id++){ - bytea[index++] = in.buffer.getByte(id); - } - num--; - } - out.buffer = io.netty.buffer.Unpooled.wrappedBuffer(bytea); + System.out.println(len + ":" + num); out.start = 0; - out.end = bytea.length; + out.buffer = buffer = buffer.reallocIfNeeded( len * num ); + for(int i =0; i < num; i++){ + in.buffer.getBytes(in.start, out.buffer, i * len, len); + } + out.end = len * num; + System.out.println(out.end); } } @@ -1023,9 +992,10 @@ public class StringFunctions{ @Param VarCharHolder enc; @Output VarCharHolder out; @Workspace Charset inCharset; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming) { - inCharset = java.nio.charset.Charset.forName(enc.toString()); + inCharset = java.nio.charset.Charset.forName(org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers.toStringFromUTF8(enc.start, enc.end, enc.buffer)); } public void eval() { @@ -1035,7 +1005,8 @@ public class StringFunctions{ bytea[index]=in.buffer.getByte(i); } byte[] outBytea = new String(bytea, inCharset).getBytes(com.google.common.base.Charsets.UTF_8); - out.buffer = io.netty.buffer.Unpooled.wrappedBuffer(outBytea); + out.buffer = buffer = buffer.reallocIfNeeded(outBytea.length); + out.buffer.setBytes(0, outBytea); out.start = 0; out.end = outBytea.length; } @@ -1049,31 +1020,31 @@ public class StringFunctions{ @Param VarCharHolder in; @Output VarCharHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming) { } public void eval() { - int charlen = 0; + final int len = in.end - in.start; + out.start = 0; + out.end = len; + out.buffer = buffer = buffer.reallocIfNeeded(len); + int charlen = 0; - byte[] bytea = new byte [in.end - in.start]; - int index = in.end; - int innerindex = 0; + int index = in.end; + int innerindex = 0; - for (int id = in.start; id < in.end; id+=charlen){ - innerindex = charlen = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.utf8CharLen(in.buffer, id); + for (int id = in.start; id < in.end; id+=charlen){ + innerindex = charlen = org.apache.drill.exec.expr.fn.impl.StringFunctionUtil.utf8CharLen(in.buffer, id); while(innerindex > 0){ - bytea[index - innerindex] = in.buffer.getByte(id + (charlen - innerindex)); + out.buffer.setByte(index - innerindex, in.buffer.getByte(id + (charlen - innerindex))); innerindex-- ; } index -= charlen; - } - out.buffer = io.netty.buffer.Unpooled.wrappedBuffer(bytea); - out.start = 0; - out.end = bytea.length; } + } } }
http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntBEConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntBEConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntBEConvertTo.java index 696ea25..e645f4b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntBEConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntBEConvertTo.java @@ -17,7 +17,9 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; -import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; + +import javax.inject.Inject; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -25,7 +27,6 @@ import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; import org.apache.drill.exec.expr.annotations.Output; import org.apache.drill.exec.expr.annotations.Param; -import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.holders.BigIntHolder; import org.apache.drill.exec.expr.holders.VarBinaryHolder; import org.apache.drill.exec.record.RecordBatch; @@ -35,11 +36,11 @@ public class BigIntBEConvertTo implements DrillSimpleFunc { @Param BigIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntConvertTo.java index 8c76113..7f6c8d2 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class BigIntConvertTo implements DrillSimpleFunc { @Param BigIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntVLongConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntVLongConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntVLongConvertTo.java index 78564f6..7a6f302 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntVLongConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BigIntVLongConvertTo.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -35,14 +38,15 @@ public class BigIntVLongConvertTo implements DrillSimpleFunc { @Param BigIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { /* Hadoop Variable length integer (represented in the same way as a long) * occupies between 1-9 bytes. */ - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(9); + buffer = buffer.reallocIfNeeded(9); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BooleanByteConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BooleanByteConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BooleanByteConvertTo.java index 0feb08b..c2a63b5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BooleanByteConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/BooleanByteConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class BooleanByteConvertTo implements DrillSimpleFunc { @Param BitHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(1); + buffer = buffer.reallocIfNeeded(1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochBEConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochBEConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochBEConvertTo.java index 9e2026a..4affa55 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochBEConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochBEConvertTo.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -35,11 +38,12 @@ public class DateEpochBEConvertTo implements DrillSimpleFunc { @Param DateHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochConvertTo.java index 90fb319..adaba9f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DateEpochConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class DateEpochConvertTo implements DrillSimpleFunc { @Param DateHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DoubleConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DoubleConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DoubleConvertTo.java index 5487ee0..dd0c7f8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DoubleConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/DoubleConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class DoubleConvertTo implements DrillSimpleFunc { @Param Float8Holder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/FloatConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/FloatConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/FloatConvertTo.java index ec06225..9175901 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/FloatConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/FloatConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class FloatConvertTo implements DrillSimpleFunc { @Param Float4Holder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(4); + buffer = buffer.reallocIfNeeded(4); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntBEConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntBEConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntBEConvertTo.java index 6de4f0a..45687f5 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntBEConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntBEConvertTo.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -35,11 +38,12 @@ public class IntBEConvertTo implements DrillSimpleFunc { @Param IntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(4); + buffer = buffer.reallocIfNeeded(4); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntConvertTo.java index c1f8977..0ef7700 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class IntConvertTo implements DrillSimpleFunc { @Param IntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(4); + buffer = buffer.reallocIfNeeded(4); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntVIntConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntVIntConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntVIntConvertTo.java index 52680c8..be09245 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntVIntConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/IntVIntConvertTo.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -35,14 +38,15 @@ public class IntVIntConvertTo implements DrillSimpleFunc { @Param IntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { /* Hadoop Variable length integer (represented in the same way as a long) * occupies between 1-9 bytes. */ - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(9); + buffer = buffer.reallocIfNeeded(9); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java index d8df543..53b8f50 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertFrom.java @@ -22,7 +22,10 @@ package org.apache.drill.exec.expr.fn.impl.conv; import java.io.ByteArrayOutputStream; import java.io.StringReader; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -50,6 +53,8 @@ public class JsonConvertFrom { public static class ConvertFromJson implements DrillSimpleFunc{ @Param VarBinaryHolder in; + @Inject DrillBuf buffer; + @Output ComplexWriter writer; public void setup(RecordBatch incoming){ @@ -60,23 +65,24 @@ public class JsonConvertFrom { byte[] buf = new byte[in.end - in.start]; in.buffer.getBytes(in.start, buf, 0, in.end - in.start); String input = new String(buf, com.google.common.base.Charsets.UTF_8); - + try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer); jsonReader.write(new java.io.StringReader(input), writer); - + } catch (Exception e) { System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); } } } - + @FunctionTemplate(name = "convert_fromJSON", scope = FunctionScope.SIMPLE, nulls = NullHandling.NULL_IF_NULL, isRandom = true) public static class ConvertFromJsonVarchar implements DrillSimpleFunc{ @Param VarCharHolder in; @Output ComplexWriter writer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ } @@ -86,12 +92,12 @@ public class JsonConvertFrom { byte[] buf = new byte[in.end - in.start]; in.buffer.getBytes(in.start, buf, 0, in.end - in.start); String input = new String(buf, com.google.common.base.Charsets.UTF_8); - + try { - org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(); + org.apache.drill.exec.vector.complex.fn.JsonReader jsonReader = new org.apache.drill.exec.vector.complex.fn.JsonReader(buffer); jsonReader.write(new java.io.StringReader(input), writer); - + } catch (Exception e) { System.out.println(" msg = " + e.getMessage() + " trace : " + e.getStackTrace()); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertTo.java index 8b0e811..c87bd5c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/JsonConvertTo.java @@ -18,25 +18,20 @@ package org.apache.drill.exec.expr.fn.impl.conv; -import java.io.ByteArrayOutputStream; +import io.netty.buffer.DrillBuf; -import io.netty.buffer.ByteBuf; +import javax.inject.Inject; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; -import org.apache.drill.exec.expr.annotations.Output; -import org.apache.drill.exec.expr.annotations.Param; -import org.apache.drill.exec.expr.annotations.Workspace; import org.apache.drill.exec.expr.annotations.FunctionTemplate.FunctionScope; import org.apache.drill.exec.expr.annotations.FunctionTemplate.NullHandling; +import org.apache.drill.exec.expr.annotations.Output; +import org.apache.drill.exec.expr.annotations.Param; import org.apache.drill.exec.expr.holders.VarBinaryHolder; -import org.apache.drill.exec.expr.holders.VarCharHolder; import org.apache.drill.exec.record.RecordBatch; -import org.apache.drill.exec.vector.complex.fn.JsonWriter; import org.apache.drill.exec.vector.complex.reader.FieldReader; -import com.google.common.base.Charsets; - public class JsonConvertTo { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(JsonConvertTo.class); @@ -48,14 +43,12 @@ public class JsonConvertTo { @Param FieldReader input; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; public void setup(RecordBatch incoming){ - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(256); } public void eval(){ - out.buffer = buffer; out.start = 0; java.io.ByteArrayOutputStream stream = new java.io.ByteArrayOutputStream(); @@ -69,12 +62,8 @@ public class JsonConvertTo { byte [] bytea = stream.toByteArray(); - if (bytea.length > buffer.capacity()) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(bytea.length); - out.buffer = buffer; - } - - out.buffer.setBytes(out.start, bytea); + out.buffer = buffer = buffer.reallocIfNeeded(bytea.length); + out.buffer.setBytes(0, bytea); out.end = bytea.length; } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntBEConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntBEConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntBEConvertTo.java index 227932a..68c9f33 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntBEConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntBEConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class SmallIntBEConvertTo implements DrillSimpleFunc { @Param SmallIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(2); + buffer = buffer.reallocIfNeeded(2); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntConvertTo.java index db74fad..2d97033 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/SmallIntConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class SmallIntConvertTo implements DrillSimpleFunc { @Param SmallIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(2); + buffer = buffer.reallocIfNeeded(2); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochBEConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochBEConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochBEConvertTo.java index e19850e..06cd085 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochBEConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochBEConvertTo.java @@ -17,7 +17,10 @@ */ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -35,11 +38,12 @@ public class TimeEpochBEConvertTo implements DrillSimpleFunc { @Param TimeHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochConvertTo.java index 56b76e0..d3802e8 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TimeEpochConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class TimeEpochConvertTo implements DrillSimpleFunc { @Param TimeHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TinyIntConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TinyIntConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TinyIntConvertTo.java index afd1eb6..f4d582e 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TinyIntConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/TinyIntConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -37,11 +40,12 @@ public class TinyIntConvertTo implements DrillSimpleFunc { @Param TinyIntHolder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(1); + buffer = buffer.reallocIfNeeded(1); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/UInt8ConvertTo.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/UInt8ConvertTo.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/UInt8ConvertTo.java index 0322d1a..4e6d606 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/UInt8ConvertTo.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/conv/UInt8ConvertTo.java @@ -18,7 +18,10 @@ ******************************************************************************/ package org.apache.drill.exec.expr.fn.impl.conv; +import javax.inject.Inject; + import io.netty.buffer.ByteBuf; +import io.netty.buffer.DrillBuf; import org.apache.drill.exec.expr.DrillSimpleFunc; import org.apache.drill.exec.expr.annotations.FunctionTemplate; @@ -36,11 +39,12 @@ public class UInt8ConvertTo implements DrillSimpleFunc { @Param UInt8Holder in; @Output VarBinaryHolder out; - @Workspace ByteBuf buffer; + @Inject DrillBuf buffer; + @Override public void setup(RecordBatch incoming) { - buffer = org.apache.drill.exec.util.ByteBufUtil.createBuffer(8); + buffer = buffer.reallocIfNeeded(8); } @Override http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java index cd08367..7cf445c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/Accountor.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.memory; -import io.netty.buffer.AccountingByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.ByteBuf; import java.util.Arrays; @@ -58,7 +58,7 @@ public class Accountor { } } - public boolean transferTo(Accountor target, AccountingByteBuf buf, long size){ + public boolean transferTo(Accountor target, DrillBuf buf, long size){ boolean withinLimit = target.forceAdditionalReservation(size); release(buf, size); @@ -96,7 +96,7 @@ public class Accountor { } } - public void reserved(long expected, AccountingByteBuf buf){ + public void reserved(long expected, DrillBuf buf){ // make sure to take away the additional memory that happened due to rounding. long additional = buf.capacity() - expected; @@ -108,7 +108,7 @@ public class Accountor { } - public void releasePartial(AccountingByteBuf buf, long size){ + public void releasePartial(DrillBuf buf, long size){ remainder.returnAllocation(size); if (ENABLE_ACCOUNTING) { if(buf != null){ @@ -122,7 +122,7 @@ public class Accountor { } } - public void release(AccountingByteBuf buf, long size) { + public void release(DrillBuf buf, long size) { remainder.returnAllocation(size); if (ENABLE_ACCOUNTING) { if(buf != null && buffers.remove(buf) == null) throw new IllegalStateException("Releasing a buffer that has already been released. Buffer: " + buf); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java index 38fe4f1..67a041d 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/BufferAllocator.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.memory; -import io.netty.buffer.AccountingByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.ByteBufAllocator; import java.io.Closeable; @@ -38,7 +38,7 @@ public interface BufferAllocator extends Closeable { * The size in bytes. * @return A new ByteBuf. */ - public abstract AccountingByteBuf buffer(int size); + public abstract DrillBuf buffer(int size); /** * Allocate a new or reused buffer within provided range. Note that the buffer may technically be larger than the @@ -48,7 +48,7 @@ public interface BufferAllocator extends Closeable { * @param maxSize The maximum size in bytes. * @return A new ByteBuf. */ - public abstract AccountingByteBuf buffer(int minSize, int maxSize); + public abstract DrillBuf buffer(int minSize, int maxSize); public abstract ByteBufAllocator getUnderlyingAllocator(); @@ -60,7 +60,7 @@ public interface BufferAllocator extends Closeable { * @param buf * @return false if over allocation. */ - public boolean takeOwnership(AccountingByteBuf buf) ; + public boolean takeOwnership(DrillBuf buf) ; public PreAllocator getNewPreAllocator(); @@ -71,7 +71,7 @@ public interface BufferAllocator extends Closeable { public interface PreAllocator { public boolean preAllocate(int bytes); - public AccountingByteBuf getAllocation(); + public DrillBuf getAllocation(); } /** @@ -91,4 +91,5 @@ public interface BufferAllocator extends Closeable { public abstract long getAllocatedMemory(); + public DrillBuf getEmpty(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java index d89c892..55f11a4 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/memory/TopLevelAllocator.java @@ -17,11 +17,12 @@ */ package org.apache.drill.exec.memory; -import io.netty.buffer.AccountingByteBuf; +import io.netty.buffer.DrillBuf; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.buffer.PooledByteBufAllocator; import io.netty.buffer.PooledByteBufAllocatorL; +import io.netty.buffer.UnsafeDirectLittleEndian; import java.util.HashMap; import java.util.IdentityHashMap; @@ -38,9 +39,11 @@ public class TopLevelAllocator implements BufferAllocator { private static final boolean ENABLE_ACCOUNTING = AssertionUtil.isAssertionsEnabled(); private final Map<ChildAllocator, StackTraceElement[]> childrenMap; - private final PooledByteBufAllocator innerAllocator = PooledByteBufAllocatorL.DEFAULT; + private final PooledByteBufAllocatorL innerAllocator = PooledByteBufAllocatorL.DEFAULT; private final Accountor acct; private final boolean errorOnLeak; + private final DrillBuf empty; + @Deprecated public TopLevelAllocator() { @@ -55,6 +58,7 @@ public class TopLevelAllocator implements BufferAllocator { private TopLevelAllocator(long maximumAllocation, boolean errorOnLeak){ this.errorOnLeak = errorOnLeak; this.acct = new Accountor(errorOnLeak, null, null, maximumAllocation, 0); + this.empty = DrillBuf.getEmpty(this, acct); this.childrenMap = ENABLE_ACCOUNTING ? new IdentityHashMap<ChildAllocator, StackTraceElement[]>() : null; } @@ -65,20 +69,21 @@ public class TopLevelAllocator implements BufferAllocator { } @Override - public boolean takeOwnership(AccountingByteBuf buf) { + public boolean takeOwnership(DrillBuf buf) { return buf.transferAccounting(acct); } - public AccountingByteBuf buffer(int min, int max) { + public DrillBuf buffer(int min, int max) { + if(min == 0) return empty; if(!acct.reserve(min)) return null; - ByteBuf buffer = innerAllocator.directBuffer(min, max); - AccountingByteBuf wrapped = new AccountingByteBuf(acct, buffer); + UnsafeDirectLittleEndian buffer = innerAllocator.directBuffer(min, max); + DrillBuf wrapped = new DrillBuf(this, acct, buffer); acct.reserved(min, wrapped); return wrapped; } @Override - public AccountingByteBuf buffer(int size) { + public DrillBuf buffer(int size) { return buffer(size, size); } @@ -124,8 +129,16 @@ public class TopLevelAllocator implements BufferAllocator { } - private class ChildAllocator implements BufferAllocator{ + @Override + public DrillBuf getEmpty() { + return empty; + } + + + + private class ChildAllocator implements BufferAllocator{ + private final DrillBuf empty; private Accountor childAcct; private Map<ChildAllocator, StackTraceElement[]> children = new HashMap<>(); private boolean closed = false; @@ -137,27 +150,29 @@ public class TopLevelAllocator implements BufferAllocator { childAcct = new Accountor(errorOnLeak, handle, parentAccountor, max, pre); this.handle = handle; thisMap = map; + this.empty = DrillBuf.getEmpty(this, childAcct); } @Override - public boolean takeOwnership(AccountingByteBuf buf) { + public boolean takeOwnership(DrillBuf buf) { return buf.transferAccounting(childAcct); } @Override - public AccountingByteBuf buffer(int size, int max) { + public DrillBuf buffer(int size, int max) { + if(size == 0) return empty; if(!childAcct.reserve(size)){ logger.warn("Unable to allocate buffer of size {} due to memory limit. Current allocation: {}", size, getAllocatedMemory()); return null; }; - ByteBuf buffer = innerAllocator.directBuffer(size, max); - AccountingByteBuf wrapped = new AccountingByteBuf(childAcct, buffer); + UnsafeDirectLittleEndian buffer = innerAllocator.directBuffer(size, max); + DrillBuf wrapped = new DrillBuf(this, childAcct, buffer); childAcct.reserved(buffer.capacity(), wrapped); return wrapped; } - public AccountingByteBuf buffer(int size) { + public DrillBuf buffer(int size) { return buffer(size, size); } @@ -179,7 +194,7 @@ public class TopLevelAllocator implements BufferAllocator { } public PreAllocator getNewPreAllocator(){ - return new PreAlloc(this.childAcct); + return new PreAlloc(this, this.childAcct); } @Override @@ -221,17 +236,25 @@ public class TopLevelAllocator implements BufferAllocator { return childAcct.getAllocation(); } + @Override + public DrillBuf getEmpty() { + return empty; + } + + } public PreAllocator getNewPreAllocator(){ - return new PreAlloc(this.acct); + return new PreAlloc(this, this.acct); } public class PreAlloc implements PreAllocator{ int bytes = 0; final Accountor acct; - private PreAlloc(Accountor acct){ + final BufferAllocator allocator; + private PreAlloc(BufferAllocator allocator, Accountor acct){ this.acct = acct; + this.allocator = allocator; } /** @@ -248,8 +271,8 @@ public class TopLevelAllocator implements BufferAllocator { } - public AccountingByteBuf getAllocation(){ - AccountingByteBuf b = new AccountingByteBuf(acct, innerAllocator.buffer(bytes)); + public DrillBuf getAllocation(){ + DrillBuf b = new DrillBuf(allocator, acct, innerAllocator.directBuffer(bytes, bytes)); acct.reserved(bytes, b); return b; } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java index 6fccb3b..6bdce3b 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/FragmentContext.java @@ -17,8 +17,13 @@ */ package org.apache.drill.exec.ops; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import io.netty.buffer.DrillBuf; + +import java.io.Closeable; +import java.io.IOException; +import java.util.List; +import java.util.Map; + import net.hydromatic.optiq.SchemaPlus; import net.hydromatic.optiq.jdbc.SimpleOptiqSchema; import org.apache.drill.common.config.DrillConfig; @@ -47,6 +52,9 @@ import java.io.Closeable; import java.io.IOException; import java.util.List; import java.util.Map; +import com.carrotsearch.hppc.LongObjectOpenHashMap; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; /** * Contextual objects required for execution of a particular fragment. @@ -70,6 +78,7 @@ public class FragmentContext implements Closeable { private final long queryStartTime; private final int rootFragmentTimeZone; private final OptionManager fragmentOptions; + private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); private volatile Throwable failureCause; private volatile boolean failed = false; @@ -77,7 +86,7 @@ public class FragmentContext implements Closeable { public FragmentContext(DrillbitContext dbContext, PlanFragment fragment, UserClientConnection connection, FunctionImplementationRegistry funcRegistry) throws OutOfMemoryException, ExecutionSetupException { - this.transformer = new ClassTransformer(); + this.transformer = new ClassTransformer(dbContext.getCache()); this.stats = new FragmentStats(dbContext.getMetrics()); this.context = dbContext; this.connection = connection; @@ -182,7 +191,7 @@ public class FragmentContext implements Closeable { } public <T> T getImplementationClass(CodeGenerator<T> cg) throws ClassTransformationException, IOException { - return transformer.getImplementationClass(this.loader, cg.getDefinition(), cg.generate(), cg.getMaterializedClassName()); + return context.getCompiler().getImplementationClass(cg); } /** @@ -251,10 +260,32 @@ public class FragmentContext implements Closeable { for(Thread thread: daemonThreads){ thread.interrupt(); } + Object[] mbuffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values; + for(int i =0; i < mbuffers.length; i++){ + if(managedBuffers.allocated[i]) ((DrillBuf)mbuffers[i]).release(); + } + if (buffers != null) { buffers.close(); } allocator.close(); } + public DrillBuf replace(DrillBuf old, int newSize){ + if(managedBuffers.remove(old.memoryAddress()) == null) throw new IllegalStateException("Tried to remove unmanaged buffer."); + old.release(); + return getManagedBuffer(newSize); + } + + public DrillBuf getManagedBuffer(){ + return getManagedBuffer(256); + } + + public DrillBuf getManagedBuffer(int size){ + DrillBuf newBuf = allocator.buffer(size); + managedBuffers.put(newBuf.memoryAddress(), newBuf); + newBuf.setFragmentContext(this); + return newBuf; + } + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java index 2d46733..c5dea4f 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/ops/OperatorContext.java @@ -17,6 +17,8 @@ */ package org.apache.drill.exec.ops; +import io.netty.buffer.DrillBuf; + import java.util.Iterator; import org.apache.drill.common.util.Hook.Closeable; @@ -24,6 +26,8 @@ import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.memory.OutOfMemoryException; import org.apache.drill.exec.physical.base.PhysicalOperator; +import com.carrotsearch.hppc.LongObjectOpenHashMap; + public class OperatorContext implements Closeable { static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(OperatorContext.class); @@ -31,6 +35,7 @@ public class OperatorContext implements Closeable { private boolean closed = false; private PhysicalOperator popConfig; private OperatorStats stats; + private LongObjectOpenHashMap<DrillBuf> managedBuffers = new LongObjectOpenHashMap<>(); public OperatorContext(PhysicalOperator popConfig, FragmentContext context) throws OutOfMemoryException { this.allocator = context.getNewChildAllocator(popConfig.getInitialAllocation(), popConfig.getMaxAllocation()); @@ -46,6 +51,23 @@ public class OperatorContext implements Closeable { this.stats = stats; } + public DrillBuf replace(DrillBuf old, int newSize){ + if(managedBuffers.remove(old.memoryAddress()) == null) throw new IllegalStateException("Tried to remove unmanaged buffer."); + old.release(); + return getManagedBuffer(newSize); + } + + public DrillBuf getManagedBuffer(){ + return getManagedBuffer(256); + } + + public DrillBuf getManagedBuffer(int size){ + DrillBuf newBuf = allocator.buffer(size); + managedBuffers.put(newBuf.memoryAddress(), newBuf); + newBuf.setOperatorContext(this); + return newBuf; + } + public static int getChildCount(PhysicalOperator popConfig){ Iterator<PhysicalOperator> iter = popConfig.iterator(); int i = 0; @@ -76,6 +98,13 @@ public class OperatorContext implements Closeable { return; } logger.debug("Closing context for {}", popConfig != null ? popConfig.getClass().getName() : null); + + // release managed buffers. + Object[] buffers = ((LongObjectOpenHashMap<Object>)(Object)managedBuffers).values; + for(int i =0; i < buffers.length; i++){ + if(managedBuffers.allocated[i]) ((DrillBuf)buffers[i]).release(); + } + if (allocator != null) { allocator.close(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java index 1aec625..32aae07 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/OutputMutator.java @@ -17,13 +17,14 @@ */ package org.apache.drill.exec.physical.impl; +import io.netty.buffer.DrillBuf; + +import java.util.List; + import org.apache.drill.exec.exception.SchemaChangeException; -import org.apache.drill.exec.memory.BufferAllocator; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.vector.ValueVector; -import java.util.List; - public interface OutputMutator { public <T extends ValueVector> T addField(MaterializedField field, Class<T> clazz) throws SchemaChangeException ; public void allocate(int recordCount); @@ -34,4 +35,5 @@ public interface OutputMutator { * One to many layers to rip out, address it as a separate JIRA. */ public void addFields(List<ValueVector> vvList); + public DrillBuf getManagedBuffer(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java index e999c9a..6216305 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/ScanBatch.java @@ -17,7 +17,7 @@ */ package org.apache.drill.exec.physical.impl; -import io.netty.buffer.Unpooled; +import io.netty.buffer.DrillBuf; import java.util.Collections; import java.util.Iterator; @@ -47,17 +47,12 @@ import org.apache.drill.exec.record.WritableBatch; import org.apache.drill.exec.record.selection.SelectionVector2; import org.apache.drill.exec.record.selection.SelectionVector4; import org.apache.drill.exec.store.RecordReader; -import org.apache.drill.exec.util.BatchPrinter; -import org.apache.drill.exec.util.VectorUtil; import org.apache.drill.exec.vector.AllocationHelper; import org.apache.drill.exec.vector.NullableVarCharVector; import org.apache.drill.exec.vector.ValueVector; import com.google.common.collect.Lists; import com.google.common.collect.Maps; -import org.apache.drill.exec.vector.complex.MapVector; -import org.apache.drill.exec.vector.complex.impl.ComplexWriterImpl; -import org.apache.drill.exec.vector.complex.writer.BaseWriter.ComplexWriter; /** * Record batch used for a particular scan. Operators against one or more @@ -212,6 +207,11 @@ public class ScanBatch implements RecordBatch { private void addPartitionVectors() throws ExecutionSetupException{ try { + if(partitionVectors != null){ + for(ValueVector v : partitionVectors){ + v.clear(); + } + } partitionVectors = Lists.newArrayList(); for (int i : selectedPartitionColumns) { MaterializedField field = MaterializedField.create(SchemaPath.getSimplePath(partitionColumnDesignator + i), Types.optional(MinorType.VARCHAR)); @@ -230,15 +230,9 @@ public class ScanBatch implements RecordBatch { if (partitionValues.length > i) { String val = partitionValues[i]; AllocationHelper.allocate(v, recordCount, val.length()); - NullableVarCharHolder h = new NullableVarCharHolder(); byte[] bytes = val.getBytes(); - h.buffer = Unpooled.buffer(bytes.length); - h.buffer.writeBytes(bytes); - h.start = 0; - h.isSet = 1; - h.end = bytes.length; for (int j = 0; j < recordCount; j++) { - v.getMutator().setSafe(j, h); + v.getMutator().setSafe(j, bytes, 0, bytes.length); } v.getMutator().setValueCount(recordCount); } else { @@ -318,6 +312,11 @@ public class ScanBatch implements RecordBatch { } return false; } + + @Override + public DrillBuf getManagedBuffer() { + return oContext.getManagedBuffer(); + } } @Override @@ -332,6 +331,9 @@ public class ScanBatch implements RecordBatch { public void cleanup(){ container.clear(); + for(ValueVector v : partitionVectors){ + v.clear(); + } fieldVectorMap.clear(); oContext.close(); } @@ -340,5 +342,5 @@ public class ScanBatch implements RecordBatch { public VectorContainer getOutgoingContainer() { throw new UnsupportedOperationException(String.format(" You should not call getOutgoingContainer() for class %s", this.getClass().getCanonicalName())); } - + } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java index eca42c9..22df5f6 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/aggregate/HashAggTemplate.java @@ -42,6 +42,7 @@ import org.apache.drill.exec.physical.impl.common.ChainedHashTable; import org.apache.drill.exec.physical.impl.common.HashTable; import org.apache.drill.exec.physical.impl.common.HashTableConfig; import org.apache.drill.exec.physical.impl.common.HashTableStats; +import org.apache.drill.exec.physical.impl.common.IndexPointer; import org.apache.drill.exec.record.BatchSchema; import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.RecordBatch; @@ -83,9 +84,9 @@ public abstract class HashAggTemplate implements HashAggregator { private HashAggregate hashAggrConfig; private HashTable htable; private ArrayList<BatchHolder> batchHolders; - private IntHolder htIdxHolder; // holder for the Hashtable's internal index returned by put() - private IntHolder outStartIdxHolder; - private IntHolder outNumRecordsHolder; + private IndexPointer htIdxHolder; // holder for the Hashtable's internal index returned by put() + private IndexPointer outStartIdxHolder; + private IndexPointer outNumRecordsHolder; private int numGroupByOutFields = 0; // Note: this should be <= number of group-by fields List<VectorAllocator> wsAllocators = Lists.newArrayList(); // allocators for the workspace vectors @@ -97,14 +98,14 @@ public abstract class HashAggTemplate implements HashAggregator { private OperatorStats stats = null; private HashTableStats htStats = new HashTableStats(); - + public enum Metric implements MetricDef { NUM_BUCKETS, NUM_ENTRIES, NUM_RESIZING, RESIZING_TIME; - + // duplicate for hash ag @Override @@ -150,7 +151,7 @@ public abstract class HashAggTemplate implements HashAggregator { setupInterior(incoming, outgoing, aggrValuesContainer); } - private boolean outputValues(IntHolder outStartIdxHolder, IntHolder outNumRecordsHolder) { + private boolean outputValues(IndexPointer outStartIdxHolder, IndexPointer outNumRecordsHolder) { outStartIdxHolder.value = batchOutputCount; outNumRecordsHolder.value = 0; boolean status = true; @@ -184,7 +185,7 @@ public abstract class HashAggTemplate implements HashAggregator { private int getNumPendingOutput() { return getNumGroups() - batchOutputCount; } - + // Code-generated methods (implemented in HashAggBatch) @RuntimeOverridden @@ -199,14 +200,14 @@ public abstract class HashAggTemplate implements HashAggregator { @Override - public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, - FragmentContext context, + public void setup(HashAggregate hashAggrConfig, HashTableConfig htConfig, + FragmentContext context, OperatorStats stats, BufferAllocator allocator, RecordBatch incoming, HashAggBatch outgoing, LogicalExpression[] valueExprs, List<TypedFieldId> valueFieldIds, - TypedFieldId[] groupByOutFieldIds, - VectorContainer outContainer) + TypedFieldId[] groupByOutFieldIds, + VectorContainer outContainer) throws SchemaChangeException, ClassTransformationException, IOException { if (valueExprs == null || valueFieldIds == null) { @@ -236,10 +237,10 @@ public abstract class HashAggTemplate implements HashAggregator { throw new IllegalArgumentException("Currently, hash aggregation is only applicable if there are group-by expressions."); } - this.htIdxHolder = new IntHolder(); - this.outStartIdxHolder = new IntHolder(); - this.outNumRecordsHolder = new IntHolder(); - + this.htIdxHolder = new IndexPointer(); + this.outStartIdxHolder = new IndexPointer(); + this.outNumRecordsHolder = new IndexPointer(); + materializedValueFields = new MaterializedField[valueFieldIds.size()]; if (valueFieldIds.size() > 0) { @@ -317,7 +318,7 @@ public abstract class HashAggTemplate implements HashAggregator { // outcome = out; buildComplete = true; - + updateStats(htable); // output the first batch; remaining batches will be output @@ -426,7 +427,7 @@ public abstract class HashAggTemplate implements HashAggregator { bh.setup(); } - + public IterOutcome outputCurrentBatch() { if (outBatchIndex >= batchHolders.size()) { this.outcome = IterOutcome.NONE; @@ -435,7 +436,7 @@ public abstract class HashAggTemplate implements HashAggregator { // get the number of records in the batch holder that are pending output int numPendingOutput = batchHolders.get(outBatchIndex).getNumPendingOutput(); - + if (!first && numPendingOutput == 0) { this.outcome = IterOutcome.NONE; return outcome; @@ -445,17 +446,17 @@ public abstract class HashAggTemplate implements HashAggregator { boolean outputKeysStatus = true; boolean outputValuesStatus = true; - + outputValuesStatus = batchHolders.get(outBatchIndex).outputValues(outStartIdxHolder, outNumRecordsHolder); int numOutputRecords = outNumRecordsHolder.value; - + if (EXTRA_DEBUG_1) { logger.debug("After output values: outStartIdx = {}, outNumRecords = {}", outStartIdxHolder.value, outNumRecordsHolder.value); } if (outputValuesStatus) { outputKeysStatus = this.htable.outputKeys(outBatchIndex, this.outContainer, outStartIdxHolder.value, outNumRecordsHolder.value) ; } - + if (outputKeysStatus && outputValuesStatus) { // set the value count for outgoing batch value vectors @@ -485,10 +486,10 @@ public abstract class HashAggTemplate implements HashAggregator { } } else { if (!outputKeysStatus) { - logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex); + logger.debug("Failed to output keys for current batch index: {} ", outBatchIndex); for(VectorWrapper<?> v : outContainer) { logger.debug("At the time of failure, size of valuevector in outContainer = {}.", v.getValueVector().getValueCapacity()); - } + } context.fail(new Exception("Failed to output keys for current batch !")); } if (!outputValuesStatus) { http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java index aff9751..d9499c7 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTable.java @@ -31,10 +31,10 @@ public interface HashTable { public static TemplateClassDefinition<HashTable> TEMPLATE_DEFINITION = new TemplateClassDefinition<HashTable>(HashTable.class, HashTableTemplate.class); /** The initial default capacity of the hash table (in terms of number of buckets). */ - static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; + static final public int DEFAULT_INITIAL_CAPACITY = 1 << 16; /** The maximum capacity of the hash table (in terms of number of buckets). */ - static final public int MAXIMUM_CAPACITY = 1 << 30; + static final public int MAXIMUM_CAPACITY = 1 << 30; /** The default load factor of a hash table. */ static final public float DEFAULT_LOAD_FACTOR = 0.75f; @@ -46,11 +46,11 @@ public interface HashTable { static final public int BATCH_MASK = 0x0000FFFF; public void setup(HashTableConfig htConfig, FragmentContext context, BufferAllocator allocator, - RecordBatch incomingBuild, RecordBatch incomingProbe, + RecordBatch incomingBuild, RecordBatch incomingProbe, RecordBatch outgoing, VectorContainer htContainerOrig); - public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int retryCount); - + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount); + public int containsKey(int incomingRowIdx, boolean isProbe); public void getStats(HashTableStats stats); http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5c5cef06/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java index e53ce05..e6c55bd 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/physical/impl/common/HashTableTemplate.java @@ -43,7 +43,7 @@ public abstract class HashTableTemplate implements HashTable { private static final org.slf4j.Logger logger = org.slf4j.LoggerFactory.getLogger(HashTable.class); private static final boolean EXTRA_DEBUG = false; - + private static final int EMPTY_SLOT = -1; // private final int MISSING_VALUE = 65544; @@ -68,7 +68,7 @@ public abstract class HashTableTemplate implements HashTable { private int freeIndex = 0; // Placeholder for the current index while probing the hash table - private IntHolder currentIdxHolder; + private IndexPointer currentIdxHolder; private FragmentContext context; @@ -92,9 +92,9 @@ public abstract class HashTableTemplate implements HashTable { private MaterializedField dummyIntField; private int numResizing = 0; - + private int resizingTime = 0; - + // This class encapsulates the links, keys and values for up to BATCH_SIZE // *unique* records. Thus, suppose there are N incoming record batches, each // of size BATCH_SIZE..but they have M unique keys altogether, the number of @@ -114,11 +114,11 @@ public abstract class HashTableTemplate implements HashTable { private int batchOutputCount = 0; private int batchIndex = 0; - + private BatchHolder(int idx) { this.batchIndex = idx; - + if (idx == 0) { // first batch holder can use the original htContainer htContainer = htContainerOrig; } else { // otherwise create a new one using the original's fields @@ -153,8 +153,8 @@ public abstract class HashTableTemplate implements HashTable { // at the incomingRowIdx. if the key does not match, update the // currentIdxHolder with the index of the next link. private boolean isKeyMatch(int incomingRowIdx, - IntHolder currentIdxHolder, - boolean isProbe) { + IndexPointer currentIdxHolder, + boolean isProbe) { int currentIdxWithinBatch = currentIdxHolder.value & BATCH_MASK; boolean match = false; @@ -164,7 +164,7 @@ public abstract class HashTableTemplate implements HashTable { } assert (currentIdxWithinBatch < HashTable.BATCH_SIZE); assert (incomingRowIdx < HashTable.BATCH_SIZE); - + if (isProbe) match = isKeyMatchInternalProbe(incomingRowIdx, currentIdxWithinBatch); else @@ -213,14 +213,14 @@ public abstract class HashTableTemplate implements HashTable { int size = links.getAccessor().getValueCount(); IntVector newLinks = allocMetadataVector(size, EMPTY_SLOT); IntVector newHashValues = allocMetadataVector(size, 0); - + for (int i = 0; i <= maxOccupiedIdx; i++) { int entryIdxWithinBatch = i; int entryIdx = entryIdxWithinBatch + batchStartIdx; int hash = hashValues.getAccessor().get(entryIdxWithinBatch); // get the already saved hash value int bucketIdx = getBucketIndex(hash, numbuckets); int newStartIdx = newStartIndices.getAccessor().get(bucketIdx); - + if (newStartIdx == EMPTY_SLOT) { // new bucket was empty newStartIndices.getMutator().setSafe(bucketIdx, entryIdx); // update the start index to point to entry newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); @@ -240,7 +240,7 @@ public abstract class HashTableTemplate implements HashTable { int batchIdx = ((idx >>> 16) & BATCH_MASK); bh = batchHolders.get(batchIdx); } - + if (bh == this && newLinks.getAccessor().get(idxWithinBatch) == EMPTY_SLOT) { newLinks.getMutator().setSafe(idxWithinBatch, entryIdx); newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); @@ -253,7 +253,7 @@ public abstract class HashTableTemplate implements HashTable { bh.links.getMutator().setSafe(idxWithinBatch, entryIdx); // update the link in the other batch newLinks.getMutator().setSafe(entryIdxWithinBatch, EMPTY_SLOT); // update the newLink entry in this batch to mark end of the hash chain newHashValues.getMutator().setSafe(entryIdxWithinBatch, hash); - + if (EXTRA_DEBUG) logger.debug("Followed hash chain in new bucket. bucketIdx = {}, newLinks[ {} ] = {}, newLinks[ {} ] = {}, hash value = {}.", bucketIdx, idxWithinBatch, newLinks.getAccessor().get(idxWithinBatch), entryIdxWithinBatch, newLinks.getAccessor().get(entryIdxWithinBatch), newHashValues.getAccessor().get(entryIdxWithinBatch)); break; @@ -282,19 +282,19 @@ public abstract class HashTableTemplate implements HashTable { BigIntVector vv0 = getValueVector(0); BigIntHolder holder = new BigIntHolder(); */ - + // set the value count for htContainer's value vectors before the transfer .. setValueCount(); - + Iterator<VectorWrapper<?>> outgoingIter = outContainer.iterator(); - + for (VectorWrapper<?> sourceWrapper : htContainer) { ValueVector sourceVV = sourceWrapper.getValueVector(); ValueVector targetVV = outgoingIter.next().getValueVector(); TransferPair tp = sourceVV.makeTransferPair(targetVV); tp.splitAndTransfer(outStartIndex, numRecords); } - + /* logger.debug("Attempting to output keys for batch index: {} from index {} to maxOccupiedIndex {}.", this.batchIndex, 0, maxOccupiedIdx); for (int i = batchOutputCount; i <= maxOccupiedIdx; i++) { @@ -312,17 +312,17 @@ public abstract class HashTableTemplate implements HashTable { return false; } } - */ + */ return true; } private void setValueCount() { for (VectorWrapper<?> vw : htContainer) { ValueVector vv = vw.getValueVector(); - vv.getMutator().setValueCount(maxOccupiedIdx + 1); + vv.getMutator().setValueCount(maxOccupiedIdx + 1); } } - + private void dump(int idx) { while (true) { int idxWithinBatch = idx & BATCH_MASK; @@ -340,8 +340,8 @@ public abstract class HashTableTemplate implements HashTable { links.clear(); hashValues.clear(); } - - // Only used for internal debugging. Get the value vector at a particular index from the htContainer. + + // Only used for internal debugging. Get the value vector at a particular index from the htContainer. // By default this assumes the VV is a BigIntVector. private ValueVector getValueVector(int index) { Object tmp = (htContainer).getValueAccessorById(BigIntVector.class, index).getValueVector(); @@ -413,23 +413,23 @@ public abstract class HashTableTemplate implements HashTable { doSetup(incomingBuild, incomingProbe); - currentIdxHolder = new IntHolder(); + currentIdxHolder = new IndexPointer(); } public int numBuckets() { return startIndices.getAccessor().getValueCount(); } - public int numResizing() { - return numResizing; + public int numResizing() { + return numResizing; } public int size() { return numEntries; } - public void getStats(HashTableStats stats) { - assert stats != null; + public void getStats(HashTableStats stats) { + assert stats != null; stats.numBuckets = numBuckets(); stats.numEntries = numEntries; stats.numResizing = numResizing; @@ -466,8 +466,8 @@ public abstract class HashTableTemplate implements HashTable { return rounded; } - - public PutStatus put(int incomingRowIdx, IntHolder htIdxHolder, int retryCount) { + + public PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder, int retryCount) { HashTable.PutStatus putStatus = put(incomingRowIdx, htIdxHolder) ; int count = retryCount; int numBatchHolders; @@ -482,7 +482,7 @@ public abstract class HashTableTemplate implements HashTable { return putStatus; } - private PutStatus put(int incomingRowIdx, IntHolder htIdxHolder) { + private PutStatus put(int incomingRowIdx, IndexPointer htIdxHolder) { int hash = getHashBuild(incomingRowIdx); hash = Math.abs(hash); @@ -587,7 +587,7 @@ public abstract class HashTableTemplate implements HashTable { int i = getBucketIndex(hash, numBuckets()); int currentIdx = startIndices.getAccessor().get(i); - + if (currentIdx == EMPTY_SLOT) { return -1; } @@ -604,7 +604,7 @@ public abstract class HashTableTemplate implements HashTable { } else if (currentIdxHolder.value == EMPTY_SLOT) { break; } else { - bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK); + bh = batchHolders.get( (currentIdxHolder.value >>> 16) & BATCH_MASK); } } @@ -641,7 +641,7 @@ public abstract class HashTableTemplate implements HashTable { private void resizeAndRehashIfNeeded() { if (numEntries < threshold) return; - + long t0 = System.currentTimeMillis(); if (EXTRA_DEBUG) logger.debug("Hash table numEntries = {}, threshold = {}; resizing the table...", numEntries, threshold); @@ -695,7 +695,7 @@ public abstract class HashTableTemplate implements HashTable { } return true; } - + private IntVector allocMetadataVector(int size, int initialValue) { IntVector vector = (IntVector) TypeHelper.getNewVector(dummyIntField, allocator); vector.allocateNew(size);
