Repository: hive Updated Branches: refs/heads/master 34331f3c7 -> 727e4b2d2
HIVE-20052: Arrow serde should fill ArrowColumnVector(Decimal) with the given schema precision/scale (Teddy Choi, reviewed by Matt McCline) Signed-off-by: Teddy Choi <pudi...@gmail.com> Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/727e4b2d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/727e4b2d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/727e4b2d Branch: refs/heads/master Commit: 727e4b2d21d6f451a5073f2eaa0241e84225281f Parents: 34331f3 Author: Teddy Choi <pudi...@gmail.com> Authored: Fri Sep 28 11:26:12 2018 +0900 Committer: Teddy Choi <pudi...@gmail.com> Committed: Fri Sep 28 11:26:12 2018 +0900 ---------------------------------------------------------------------- .../hadoop/hive/ql/io/arrow/Serializer.java | 33 ++++++++++++++------ .../io/arrow/TestArrowColumnarBatchSerDe.java | 25 +++++++++++++++ 2 files changed, 49 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/727e4b2d/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java index 6b31045..7dffa6b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/arrow/Serializer.java @@ -38,11 +38,14 @@ import org.apache.arrow.vector.VectorSchemaRoot; import org.apache.arrow.vector.complex.ListVector; import org.apache.arrow.vector.complex.MapVector; import org.apache.arrow.vector.complex.NullableMapVector; +import org.apache.arrow.vector.holders.DecimalHolder; import org.apache.arrow.vector.types.TimeUnit; import org.apache.arrow.vector.types.Types; import org.apache.arrow.vector.types.pojo.ArrowType; import org.apache.arrow.vector.types.pojo.FieldType; import org.apache.hadoop.conf.Configuration; +import org.apache.arrow.vector.util.DecimalUtility; +import org.apache.hadoop.hive.common.type.HiveDecimal; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; @@ -61,6 +64,7 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.SerDeException; +import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; @@ -74,10 +78,10 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.arrow.memory.BufferAllocator; -import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import java.math.BigDecimal; +import java.math.BigInteger; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import static org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_ARROW_BATCH_SIZE; @@ -107,6 +111,7 @@ public class Serializer { private int fieldSize; private final NullableMapVector rootVector; + private final DecimalHolder decimalHolder = new DecimalHolder(); //Constructor for non-serde serialization public Serializer(Configuration conf, String attemptId, List<TypeInfo> typeInfos, List<String> fieldNames) { @@ -277,7 +282,7 @@ public class Serializer { } } - private static void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + private void write(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { switch (typeInfo.getCategory()) { case PRIMITIVE: @@ -300,7 +305,7 @@ public class Serializer { } } - private static void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo, + private void writeMap(ListVector arrowVector, MapColumnVector hiveVector, MapTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final ListTypeInfo structListTypeInfo = toStructListTypeInfo(typeInfo); final ListColumnVector structListVector = toStructListVector(hiveVector); @@ -317,7 +322,7 @@ public class Serializer { } } - private static void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, + private void writeUnion(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) typeInfo; final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos(); @@ -331,7 +336,7 @@ public class Serializer { write(arrowVector, hiveObjectVector, objectTypeInfo, size, vectorizedRowBatch, isNative); } - private static void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, + private void writeStruct(MapVector arrowVector, StructColumnVector hiveVector, StructTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final List<String> fieldNames = typeInfo.getAllStructFieldNames(); final List<TypeInfo> fieldTypeInfos = typeInfo.getAllStructFieldTypeInfos(); @@ -360,7 +365,7 @@ public class Serializer { } } - private static void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size, + private void writeList(ListVector arrowVector, ListColumnVector hiveVector, ListTypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final int OFFSET_WIDTH = 4; final TypeInfo elementTypeInfo = typeInfo.getListElementTypeInfo(); @@ -389,7 +394,7 @@ public class Serializer { //Handle cases for both internally constructed //and externally provided (isNative) VectorRowBatch - private static void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, + private void writePrimitive(FieldVector arrowVector, ColumnVector hiveVector, TypeInfo typeInfo, int size, VectorizedRowBatch vectorizedRowBatch, boolean isNative) { final PrimitiveObjectInspector.PrimitiveCategory primitiveCategory = ((PrimitiveTypeInfo) typeInfo).getPrimitiveCategory(); @@ -833,11 +838,21 @@ public class Serializer { //decimal and decimal64 private static final IntAndVectorsConsumer decimalNullSetter = (i, arrowVector, hiveVector) -> ((DecimalVector) arrowVector).setNull(i); - private static final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector, typeInfo) + private final IntIntAndVectorsConsumer decimalValueSetter = (i, j, arrowVector, hiveVector, typeInfo) -> { final DecimalVector decimalVector = (DecimalVector) arrowVector; final int scale = decimalVector.getScale(); decimalVector.set(i, ((DecimalColumnVector) hiveVector).vector[j].getHiveDecimal().bigDecimalValue().setScale(scale)); + + final HiveDecimalWritable writable = ((DecimalColumnVector) hiveVector).vector[i]; + decimalHolder.precision = writable.precision(); + decimalHolder.scale = scale; + try (ArrowBuf arrowBuf = allocator.buffer(DecimalHolder.WIDTH)) { + decimalHolder.buffer = arrowBuf; + final BigInteger bigInteger = new BigInteger(writable.getInternalStorage()). + multiply(BigInteger.TEN.pow(scale - writable.scale())); + decimalVector.set(i, new BigDecimal(bigInteger, scale)); + } }; private static final IntIntAndVectorsConsumer decimal64ValueSetter = (i, j, arrowVector, hiveVector, typeInfo) -> { http://git-wip-us.apache.org/repos/asf/hive/blob/727e4b2d/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java index 9524040..2e011b5 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/arrow/TestArrowColumnarBatchSerDe.java @@ -527,6 +527,31 @@ public class TestArrowColumnarBatchSerDe { } @Test + public void testRandomPrimitiveDecimal() throws SerDeException { + String[][] schema = { + {"decimal1", "decimal(38,10)"}, + }; + + int size = 1000; + Object[][] randomDecimals = new Object[size][]; + Random random = new Random(); + for (int i = 0; i < size; i++) { + StringBuilder builder = new StringBuilder(); + builder.append(random.nextBoolean() ? '+' : '-'); + for (int j = 0; j < 28 ; j++) { + builder.append(random.nextInt(10)); + } + builder.append('.'); + for (int j = 0; j < 10; j++) { + builder.append(random.nextInt(10)); + } + randomDecimals[i] = new Object[] {decimalW(HiveDecimal.create(builder.toString()))}; + } + + initAndSerializeAndDeserialize(schema, randomDecimals); + } + + @Test public void testPrimitiveBoolean() throws SerDeException { String[][] schema = { {"boolean1", "boolean"},