Repository: incubator-drill Updated Branches: refs/heads/master fc58c693a -> 0a28365a2
DRILL-1741: kvgen support for complex values (maps/ lists) Project: http://git-wip-us.apache.org/repos/asf/incubator-drill/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-drill/commit/5fe54e0b Tree: http://git-wip-us.apache.org/repos/asf/incubator-drill/tree/5fe54e0b Diff: http://git-wip-us.apache.org/repos/asf/incubator-drill/diff/5fe54e0b Branch: refs/heads/master Commit: 5fe54e0b13ed5ad2df8d3ba1781c52d02a2c66ef Parents: fc58c69 Author: Mehant Baid <[email protected]> Authored: Wed Nov 19 15:34:36 2014 -0800 Committer: Mehant Baid <[email protected]> Committed: Mon Nov 24 11:47:50 2014 -0800 ---------------------------------------------------------------------- .../drill/exec/expr/fn/impl/MappifyUtility.java | 7 +- .../drill/exec/vector/complex/MapUtility.java | 277 +++++++++---------- .../complex/writer/TestComplexTypeReader.java | 22 ++ .../jsoninput/kvgen_complex_input.json | 59 ++++ 4 files changed, 217 insertions(+), 148 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5fe54e0b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java index 7f7a28c..160910c 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/expr/fn/impl/MappifyUtility.java @@ -58,11 +58,6 @@ public class MappifyUtility { continue; } - // Check if the value field is not repeated - if (fieldReader.getType().getMode() == TypeProtos.DataMode.REPEATED) { - throw new DrillRuntimeException("kvgen function does not support repeated type values"); - } - // writing a new field, start a new map mapWriter.start(); @@ -77,7 +72,7 @@ public class MappifyUtility { mapWriter.varChar(fieldKey).write(vh); // Write the value to the map - MapUtility.writeToMapFromReader(fieldReader, mapWriter, buffer); + MapUtility.writeToMapFromReader(fieldReader, mapWriter); mapWriter.end(); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5fe54e0b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java index 6c1907a..93f0de9 100644 --- a/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java +++ b/exec/java-exec/src/main/java/org/apache/drill/exec/vector/complex/MapUtility.java @@ -17,212 +17,205 @@ */ package org.apache.drill.exec.vector.complex; -import com.google.common.base.Charsets; -import io.netty.buffer.DrillBuf; import org.apache.drill.common.exceptions.DrillRuntimeException; import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos.MinorType; import org.apache.drill.common.types.TypeProtos.MajorType; + import org.apache.drill.exec.expr.fn.impl.MappifyUtility; -import org.apache.drill.exec.expr.holders.BigIntHolder; -import org.apache.drill.exec.expr.holders.BitHolder; -import org.apache.drill.exec.expr.holders.DateHolder; -import org.apache.drill.exec.expr.holders.Decimal18Holder; -import org.apache.drill.exec.expr.holders.Decimal28SparseHolder; -import org.apache.drill.exec.expr.holders.Decimal38SparseHolder; -import org.apache.drill.exec.expr.holders.Decimal9Holder; -import org.apache.drill.exec.expr.holders.Float4Holder; -import org.apache.drill.exec.expr.holders.Float8Holder; -import org.apache.drill.exec.expr.holders.IntHolder; -import org.apache.drill.exec.expr.holders.IntervalDayHolder; -import org.apache.drill.exec.expr.holders.IntervalHolder; -import org.apache.drill.exec.expr.holders.IntervalYearHolder; -import org.apache.drill.exec.expr.holders.SmallIntHolder; -import org.apache.drill.exec.expr.holders.TimeHolder; -import org.apache.drill.exec.expr.holders.TimeStampHolder; -import org.apache.drill.exec.expr.holders.TinyIntHolder; -import org.apache.drill.exec.expr.holders.UInt1Holder; -import org.apache.drill.exec.expr.holders.UInt2Holder; -import org.apache.drill.exec.expr.holders.UInt4Holder; -import org.apache.drill.exec.expr.holders.UInt8Holder; -import org.apache.drill.exec.expr.holders.VarBinaryHolder; -import org.apache.drill.exec.expr.holders.VarCharHolder; -import org.apache.drill.exec.util.DecimalUtility; import org.apache.drill.exec.vector.complex.reader.FieldReader; import org.apache.drill.exec.vector.complex.writer.BaseWriter; -import org.joda.time.Period; public class MapUtility { /* * Function to read a value from the field reader, detect the type, construct the appropriate value holder * and use the value holder to write to the Map. */ - public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter, DrillBuf buffer) { + // TODO : This should be templatized and generated using freemarker + public static void writeToMapFromReader(FieldReader fieldReader, BaseWriter.MapWriter mapWriter) { MajorType valueMajorType = fieldReader.getType(); MinorType valueMinorType = valueMajorType.getMinorType(); + boolean repeated = false; + + if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) { + repeated = true; + } switch (valueMinorType) { case TINYINT: - TinyIntHolder tinyIntHolder = new TinyIntHolder(); - tinyIntHolder.value = fieldReader.readByte(); - mapWriter.tinyInt(MappifyUtility.fieldValue).write(tinyIntHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).tinyInt()); + } else { + fieldReader.copyAsValue(mapWriter.tinyInt(MappifyUtility.fieldValue)); + } break; case SMALLINT: - SmallIntHolder smallIntHolder = new SmallIntHolder(); - smallIntHolder.value = fieldReader.readShort(); - mapWriter.smallInt(MappifyUtility.fieldValue).write(smallIntHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).smallInt()); + } else { + fieldReader.copyAsValue(mapWriter.smallInt(MappifyUtility.fieldValue)); + } break; case BIGINT: - BigIntHolder bh = new BigIntHolder(); - bh.value = fieldReader.readLong(); - mapWriter.bigInt(MappifyUtility.fieldValue).write(bh); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).bigInt()); + } else { + fieldReader.copyAsValue(mapWriter.bigInt(MappifyUtility.fieldValue)); + } break; case INT: - IntHolder ih = new IntHolder(); - ih.value = fieldReader.readInteger(); - mapWriter.integer(MappifyUtility.fieldValue).write(ih); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).integer()); + } else { + fieldReader.copyAsValue(mapWriter.integer(MappifyUtility.fieldValue)); + } break; case UINT1: - UInt1Holder uInt1Holder = new UInt1Holder(); - uInt1Holder.value = fieldReader.readByte(); - mapWriter.uInt1(MappifyUtility.fieldValue).write(uInt1Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt1()); + } else { + fieldReader.copyAsValue(mapWriter.uInt1(MappifyUtility.fieldValue)); + } break; case UINT2: - UInt2Holder uInt2Holder = new UInt2Holder(); - uInt2Holder.value = fieldReader.readCharacter(); - mapWriter.uInt2(MappifyUtility.fieldValue).write(uInt2Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt2()); + } else { + fieldReader.copyAsValue(mapWriter.uInt2(MappifyUtility.fieldValue)); + } break; case UINT4: - UInt4Holder uInt4Holder = new UInt4Holder(); - uInt4Holder.value = fieldReader.readInteger(); - mapWriter.uInt4(MappifyUtility.fieldValue).write(uInt4Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt4()); + } else { + fieldReader.copyAsValue(mapWriter.uInt4(MappifyUtility.fieldValue)); + } break; case UINT8: - UInt8Holder uInt8Holder = new UInt8Holder(); - uInt8Holder.value = fieldReader.readInteger(); - mapWriter.uInt8(MappifyUtility.fieldValue).write(uInt8Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).uInt8()); + } else { + fieldReader.copyAsValue(mapWriter.uInt8(MappifyUtility.fieldValue)); + } break; case DECIMAL9: - Decimal9Holder decimalHolder = new Decimal9Holder(); - decimalHolder.value = fieldReader.readBigDecimal().intValue(); - decimalHolder.scale = valueMajorType.getScale(); - decimalHolder.precision = valueMajorType.getPrecision(); - mapWriter.decimal9(MappifyUtility.fieldValue).write(decimalHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal9()); + } else { + fieldReader.copyAsValue(mapWriter.decimal9(MappifyUtility.fieldValue)); + } break; case DECIMAL18: - Decimal18Holder decimal18Holder = new Decimal18Holder(); - decimal18Holder.value = fieldReader.readBigDecimal().longValue(); - decimal18Holder.scale = valueMajorType.getScale(); - decimal18Holder.precision = valueMajorType.getPrecision(); - mapWriter.decimal18(MappifyUtility.fieldValue).write(decimal18Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal18()); + } else { + fieldReader.copyAsValue(mapWriter.decimal18(MappifyUtility.fieldValue)); + } break; case DECIMAL28SPARSE: - Decimal28SparseHolder decimal28Holder = new Decimal28SparseHolder(); - - // Ensure that the buffer used to store decimal is of sufficient length - buffer.reallocIfNeeded(decimal28Holder.WIDTH); - decimal28Holder.scale = valueMajorType.getScale(); - decimal28Holder.precision = valueMajorType.getPrecision(); - decimal28Holder.buffer = buffer; - decimal28Holder.start = 0; - DecimalUtility.getSparseFromBigDecimal(fieldReader.readBigDecimal(), buffer, 0, decimal28Holder.scale, - decimal28Holder.precision, decimal28Holder.nDecimalDigits); - mapWriter.decimal28Sparse(MappifyUtility.fieldValue).write(decimal28Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal28Sparse()); + } else { + fieldReader.copyAsValue(mapWriter.decimal28Sparse(MappifyUtility.fieldValue)); + } break; case DECIMAL38SPARSE: - Decimal38SparseHolder decimal38Holder = new Decimal38SparseHolder(); - - // Ensure that the buffer used to store decimal is of sufficient length - buffer.reallocIfNeeded(decimal38Holder.WIDTH); - decimal38Holder.scale = valueMajorType.getScale(); - decimal38Holder.precision = valueMajorType.getPrecision(); - decimal38Holder.buffer = buffer; - decimal38Holder.start = 0; - DecimalUtility.getSparseFromBigDecimal(fieldReader.readBigDecimal(), buffer, 0, decimal38Holder.scale, - decimal38Holder.precision, decimal38Holder.nDecimalDigits); - - mapWriter.decimal38Sparse(MappifyUtility.fieldValue).write(decimal38Holder); - break; + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).decimal38Sparse()); + } else { + fieldReader.copyAsValue(mapWriter.decimal38Sparse(MappifyUtility.fieldValue)); + } + break; case DATE: - DateHolder dateHolder = new DateHolder(); - dateHolder.value = fieldReader.readLong(); - mapWriter.date(MappifyUtility.fieldValue).write(dateHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).date()); + } else { + fieldReader.copyAsValue(mapWriter.date(MappifyUtility.fieldValue)); + } break; case TIME: - TimeHolder timeHolder = new TimeHolder(); - timeHolder.value = fieldReader.readInteger(); - mapWriter.time(MappifyUtility.fieldValue).write(timeHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).time()); + } else { + fieldReader.copyAsValue(mapWriter.time(MappifyUtility.fieldValue)); + } break; case TIMESTAMP: - TimeStampHolder timeStampHolder = new TimeStampHolder(); - timeStampHolder.value = fieldReader.readLong(); - mapWriter.timeStamp(MappifyUtility.fieldValue).write(timeStampHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).timeStamp()); + } else { + fieldReader.copyAsValue(mapWriter.timeStamp(MappifyUtility.fieldValue)); + } break; case INTERVAL: - IntervalHolder intervalHolder = new IntervalHolder(); - Period period = fieldReader.readPeriod(); - intervalHolder.months = (period.getYears() * org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths) + period.getMonths(); - intervalHolder.days = period.getDays(); - intervalHolder.milliseconds = (period.getHours() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) + - (period.getMinutes() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) + - (period.getSeconds() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) + - (period.getMillis()); - mapWriter.interval(MappifyUtility.fieldValue).write(intervalHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).interval()); + } else { + fieldReader.copyAsValue(mapWriter.interval(MappifyUtility.fieldValue)); + } break; case INTERVALDAY: - IntervalDayHolder intervalDayHolder = new IntervalDayHolder(); - Period periodDay = fieldReader.readPeriod(); - intervalDayHolder.days = periodDay.getDays(); - intervalDayHolder.milliseconds = (periodDay.getHours() * org.apache.drill.exec.expr.fn.impl.DateUtility.hoursToMillis) + - (periodDay.getMinutes() * org.apache.drill.exec.expr.fn.impl.DateUtility.minutesToMillis) + - (periodDay.getSeconds() * org.apache.drill.exec.expr.fn.impl.DateUtility.secondsToMillis) + - (periodDay.getMillis()); - mapWriter.intervalDay(MappifyUtility.fieldValue).write(intervalDayHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).intervalDay()); + } else { + fieldReader.copyAsValue(mapWriter.intervalDay(MappifyUtility.fieldValue)); + } break; case INTERVALYEAR: - IntervalYearHolder intervalYearHolder = new IntervalYearHolder(); - Period periodYear = fieldReader.readPeriod(); - intervalYearHolder.value = (periodYear.getYears() * org.apache.drill.exec.expr.fn.impl.DateUtility.yearsToMonths) + periodYear.getMonths(); - mapWriter.intervalYear(MappifyUtility.fieldValue).write(intervalYearHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).intervalYear()); + } else { + fieldReader.copyAsValue(mapWriter.intervalYear(MappifyUtility.fieldValue)); + } break; case FLOAT4: - Float4Holder float4Holder = new Float4Holder(); - float4Holder.value = fieldReader.readFloat(); - mapWriter.float4(MappifyUtility.fieldValue).write(float4Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).float4()); + } else { + fieldReader.copyAsValue(mapWriter.float4(MappifyUtility.fieldValue)); + } break; case FLOAT8: - Float8Holder float8Holder = new Float8Holder(); - float8Holder.value = fieldReader.readDouble(); - mapWriter.float8(MappifyUtility.fieldValue).write(float8Holder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).float8()); + } else { + fieldReader.copyAsValue(mapWriter.float8(MappifyUtility.fieldValue)); + } break; case BIT: - BitHolder bitHolder = new BitHolder(); - bitHolder.value = (fieldReader.readBoolean() == true) ? 1 : 0; - mapWriter.bit(MappifyUtility.fieldValue).write(bitHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).bit()); + } else { + fieldReader.copyAsValue(mapWriter.bit(MappifyUtility.fieldValue)); + } break; case VARCHAR: - VarCharHolder vh1 = new VarCharHolder(); - byte[] b = fieldReader.readText().toString().getBytes(Charsets.UTF_8); - buffer.reallocIfNeeded(b.length); - buffer.setBytes(0, b); - vh1.start = 0; - vh1.end = b.length; - vh1.buffer = buffer; - mapWriter.varChar(MappifyUtility.fieldValue).write(vh1); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).varChar()); + } else { + fieldReader.copyAsValue(mapWriter.varChar(MappifyUtility.fieldValue)); + } break; case VARBINARY: - VarBinaryHolder varBinaryHolder = new VarBinaryHolder(); - byte[] b1 = fieldReader.readByteArray(); - buffer.reallocIfNeeded(b1.length); - buffer.setBytes(0, b1); - varBinaryHolder.start = 0; - varBinaryHolder.end = b1.length; - varBinaryHolder.buffer = buffer; - mapWriter.varBinary(MappifyUtility.fieldValue).write(varBinaryHolder); + if (repeated) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).varBinary()); + } else { + fieldReader.copyAsValue(mapWriter.varBinary(MappifyUtility.fieldValue)); + } + break; + case MAP: + if (valueMajorType.getMode() == TypeProtos.DataMode.REPEATED) { + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).map()); + } else { + fieldReader.copyAsValue(mapWriter.map(MappifyUtility.fieldValue)); + } + break; + case LIST: + fieldReader.copyAsValue(mapWriter.list(MappifyUtility.fieldValue).list()); break; default: - throw new DrillRuntimeException(String.format("Mappify does not support input of type: %s", valueMinorType)); + throw new DrillRuntimeException(String.format("kvgen does not support input of type: %s", valueMinorType)); } } } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5fe54e0b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java index 83e1130..9632bf6 100644 --- a/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java +++ b/exec/java-exec/src/test/java/org/apache/drill/exec/vector/complex/writer/TestComplexTypeReader.java @@ -190,6 +190,28 @@ public class TestComplexTypeReader extends BaseTestQuery{ } @Test + // Functions tests kvgen functionality where the 'value' part of the map is complex + public void testKVGenWithComplexValues() throws Exception { + // test where 'value' is a list of integers + test("select kvgen(a) from cp.`jsoninput/kvgen_complex_input.json`"); + + // test where 'value' is a repeated list of floats + test("select kvgen(c) from cp.`jsoninput/kvgen_complex_input.json`"); + + // test where 'value' is a map + test("select kvgen(e) from cp.`jsoninput/kvgen_complex_input.json`"); + + // test where 'value' is a repeated list of maps + test("select kvgen(i) from cp.`jsoninput/kvgen_complex_input.json`"); + + // test where 'value' is a map that contains a list + test("select kvgen(m) from cp.`jsoninput/kvgen_complex_input.json`"); + + // test where 'value' is a map that contains a map + test("select kvgen(p) from cp.`jsoninput/kvgen_complex_input.json`"); + } + + @Test public void testNestedFlatten() throws Exception { test("select flatten(rl) from cp.`jsoninput/input2.json`"); } http://git-wip-us.apache.org/repos/asf/incubator-drill/blob/5fe54e0b/exec/java-exec/src/test/resources/jsoninput/kvgen_complex_input.json ---------------------------------------------------------------------- diff --git a/exec/java-exec/src/test/resources/jsoninput/kvgen_complex_input.json b/exec/java-exec/src/test/resources/jsoninput/kvgen_complex_input.json new file mode 100644 index 0000000..1eeb78f --- /dev/null +++ b/exec/java-exec/src/test/resources/jsoninput/kvgen_complex_input.json @@ -0,0 +1,59 @@ +{ + "a": { + "b": [ + 1, + 2, + 3 + ] + }, + "c": { + "d": [ + [ + 1.1, + 2.2 + ], + [ + 3.3, + 4.4 + ] + ] + }, + "e": { + "f": { + "g": 1, + "h": 2 + } + }, + "i": { + "j": [ + { + "k": "asd", + "l": 2 + }, + { + "k": "foo", + "l": 3 + } + ] + }, + "m": { + "n": { + "o": [ + "2", + "3" + ] + } + }, + "p": { + "q": { + "r": { + "s": 1, + "t": 2 + }, + "u": { + "v": "foo", + "w": 1 + } + } + } +} \ No newline at end of file
