HIVE-16207: Add support for Complex Types in Fast SerDe (Teddy Choi, reviewed by Matt McCline)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/d467e172 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/d467e172 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/d467e172 Branch: refs/heads/hive-14535 Commit: d467e172262c23b97e1d55e35798ba530cae5972 Parents: 189d454 Author: Teddy Choi <[email protected]> Authored: Thu May 18 17:43:36 2017 -0500 Committer: Matt McCline <[email protected]> Committed: Thu May 18 17:43:36 2017 -0500 ---------------------------------------------------------------------- .../hive/ql/exec/vector/VectorAssignRow.java | 220 ++-- .../ql/exec/vector/VectorDeserializeRow.java | 912 ++++++++++----- .../hive/ql/exec/vector/VectorExtractRow.java | 282 +++-- .../hive/ql/exec/vector/VectorSerializeRow.java | 373 +++++-- .../ql/exec/vector/TestVectorRowObject.java | 13 +- .../hive/ql/exec/vector/TestVectorSerDeRow.java | 498 ++------- .../ql/exec/vector/VectorRandomRowSource.java | 776 +++++++++---- .../hive/ql/exec/vector/VectorVerifyFast.java | 698 ++++++++++++ .../mapjoin/fast/CheckFastRowHashMap.java | 50 +- .../fast/TestVectorMapJoinFastRowHashMap.java | 143 +-- .../exec/vector/mapjoin/fast/VerifyFastRow.java | 874 ++++++++++----- .../fast/BinarySortableDeserializeRead.java | 299 ++++- .../fast/BinarySortableSerializeWrite.java | 295 +++-- .../hive/serde2/fast/DeserializeRead.java | 114 +- .../hadoop/hive/serde2/fast/SerializeWrite.java | 30 + .../hive/serde2/io/TimestampWritable.java | 4 +- .../hadoop/hive/serde2/lazy/VerifyLazy.java | 444 ++++++++ .../lazy/fast/LazySimpleDeserializeRead.java | 1034 ++++++++++++++---- .../lazy/fast/LazySimpleSerializeWrite.java | 320 +++--- .../fast/LazyBinaryDeserializeRead.java | 647 +++++++---- .../fast/LazyBinarySerializeWrite.java | 791 ++++++-------- .../StandardUnionObjectInspector.java | 25 + .../hive/serde2/SerdeRandomRowSource.java | 627 +++++++++-- .../apache/hadoop/hive/serde2/VerifyFast.java | 877 ++++++++++----- .../hive/serde2/binarysortable/MyTestClass.java | 24 +- .../binarysortable/TestBinarySortableFast.java | 121 +- .../hive/serde2/lazy/TestLazySimpleFast.java | 171 +-- .../serde2/lazybinary/TestLazyBinaryFast.java | 113 +- 28 files changed, 7424 insertions(+), 3351 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java index 9c84937..b0d1c75 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorAssignRow.java @@ -21,7 +21,13 @@ package org.apache.hadoop.hive.ql.exec.vector; import java.sql.Date; import java.sql.Timestamp; import java.util.List; +import java.util.Map; +import org.apache.hadoop.hive.serde2.objectinspector.StandardUnionObjectInspector.StandardUnion; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.hive.common.type.HiveChar; @@ -89,11 +95,8 @@ public class VectorAssignRow { // Assigning can be a subset of columns, so this is the projection -- // the batch column numbers. - Category[] targetCategories; - // The data type category of each column being assigned. - - PrimitiveCategory[] targetPrimitiveCategories; - // The data type primitive category of each column being assigned. + TypeInfo[] targetTypeInfos; + // The type info of each column being assigned. int[] maxLengths; // For the CHAR and VARCHAR data types, the maximum character length of @@ -117,8 +120,7 @@ public class VectorAssignRow { private void allocateArrays(int count) { isConvert = new boolean[count]; projectionColumnNums = new int[count]; - targetCategories = new Category[count]; - targetPrimitiveCategories = new PrimitiveCategory[count]; + targetTypeInfos = new TypeInfo[count]; maxLengths = new int[count]; } @@ -136,12 +138,10 @@ public class VectorAssignRow { private void initTargetEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo typeInfo) { isConvert[logicalColumnIndex] = false; projectionColumnNums[logicalColumnIndex] = projectionColumnNum; - Category category = typeInfo.getCategory(); - targetCategories[logicalColumnIndex] = category; - if (category == Category.PRIMITIVE) { - PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; - PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory(); - targetPrimitiveCategories[logicalColumnIndex] = primitiveCategory; + targetTypeInfos[logicalColumnIndex] = typeInfo; + if (typeInfo.getCategory() == Category.PRIMITIVE) { + final PrimitiveTypeInfo primitiveTypeInfo = (PrimitiveTypeInfo) typeInfo; + final PrimitiveCategory primitiveCategory = primitiveTypeInfo.getPrimitiveCategory(); switch (primitiveCategory) { case CHAR: maxLengths[logicalColumnIndex] = ((CharTypeInfo) primitiveTypeInfo).getLength(); @@ -162,15 +162,16 @@ public class VectorAssignRow { */ private void initConvertSourceEntry(int logicalColumnIndex, TypeInfo convertSourceTypeInfo) { isConvert[logicalColumnIndex] = true; - Category convertSourceCategory = convertSourceTypeInfo.getCategory(); + final Category convertSourceCategory = convertSourceTypeInfo.getCategory(); if (convertSourceCategory == Category.PRIMITIVE) { - PrimitiveTypeInfo convertSourcePrimitiveTypeInfo = (PrimitiveTypeInfo) convertSourceTypeInfo; + final PrimitiveTypeInfo convertSourcePrimitiveTypeInfo = (PrimitiveTypeInfo) convertSourceTypeInfo; convertSourcePrimitiveObjectInspectors[logicalColumnIndex] = PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector( convertSourcePrimitiveTypeInfo); // These need to be based on the target. - PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex]; + final PrimitiveCategory targetPrimitiveCategory = + ((PrimitiveTypeInfo) targetTypeInfos[logicalColumnIndex]).getPrimitiveCategory(); switch (targetPrimitiveCategory) { case DATE: convertTargetWritables[logicalColumnIndex] = new DateWritable(); @@ -191,17 +192,17 @@ public class VectorAssignRow { public void init(StructObjectInspector structObjectInspector, List<Integer> projectedColumns) throws HiveException { - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); + final List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); final int count = fields.size(); allocateArrays(count); for (int i = 0; i < count; i++) { - int projectionColumnNum = projectedColumns.get(i); + final int projectionColumnNum = projectedColumns.get(i); - StructField field = fields.get(i); - ObjectInspector fieldInspector = field.getFieldObjectInspector(); - TypeInfo typeInfo = + final StructField field = fields.get(i); + final ObjectInspector fieldInspector = field.getFieldObjectInspector(); + final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName()); initTargetEntry(i, projectionColumnNum, typeInfo); @@ -214,15 +215,15 @@ public class VectorAssignRow { */ public void init(StructObjectInspector structObjectInspector) throws HiveException { - List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); + final List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); final int count = fields.size(); allocateArrays(count); for (int i = 0; i < count; i++) { - StructField field = fields.get(i); - ObjectInspector fieldInspector = field.getFieldObjectInspector(); - TypeInfo typeInfo = + final StructField field = fields.get(i); + final ObjectInspector fieldInspector = field.getFieldObjectInspector(); + final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(fieldInspector.getTypeName()); initTargetEntry(i, i, typeInfo); @@ -240,7 +241,7 @@ public class VectorAssignRow { for (int i = 0; i < count; i++) { - TypeInfo typeInfo = + final TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString(typeNames.get(i)); initTargetEntry(i, i, typeInfo); @@ -268,14 +269,14 @@ public class VectorAssignRow { public int initConversion(TypeInfo[] sourceTypeInfos, TypeInfo[] targetTypeInfos, boolean[] columnsToIncludeTruncated) { - int targetColumnCount; + final int targetColumnCount; if (columnsToIncludeTruncated == null) { targetColumnCount = targetTypeInfos.length; } else { targetColumnCount = Math.min(targetTypeInfos.length, columnsToIncludeTruncated.length); } - int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount); + final int sourceColumnCount = Math.min(sourceTypeInfos.length, targetColumnCount); allocateArrays(sourceColumnCount); allocateConvertArrays(sourceColumnCount); @@ -287,14 +288,14 @@ public class VectorAssignRow { // Field not included in query. } else { - TypeInfo targetTypeInfo = targetTypeInfos[i]; + final TypeInfo targetTypeInfo = targetTypeInfos[i]; if (targetTypeInfo.getCategory() != ObjectInspector.Category.PRIMITIVE) { // For now, we don't have an assigner for complex types... } else { - TypeInfo sourceTypeInfo = sourceTypeInfos[i]; + final TypeInfo sourceTypeInfo = sourceTypeInfos[i]; if (!sourceTypeInfo.equals(targetTypeInfo)) { @@ -333,75 +334,83 @@ public class VectorAssignRow { * @param logicalColumnIndex * @param object The row column object whose type is the target data type. */ - public void assignRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex, - Object object) { - Category targetCategory = targetCategories[logicalColumnIndex]; - if (targetCategory == null) { + public void assignRowColumn( + VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex, Object object) { + + final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; + final TypeInfo targetTypeInfo = targetTypeInfos[logicalColumnIndex]; + if (targetTypeInfo == null || targetTypeInfo.getCategory() == null) { /* * This is a column that we don't want (i.e. not included) -- we are done. */ return; } - final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; + assignRowColumn(batch.cols[projectionColumnNum], batchIndex, targetTypeInfo, object); + } + + private void assignRowColumn( + ColumnVector columnVector, int batchIndex, TypeInfo targetTypeInfo, Object object) { + if (object == null) { - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + VectorizedBatchUtil.setNullColIsNullValue(columnVector, batchIndex); return; } - switch (targetCategory) { + switch (targetTypeInfo.getCategory()) { case PRIMITIVE: { - PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex]; + final PrimitiveCategory targetPrimitiveCategory = + ((PrimitiveTypeInfo) targetTypeInfo).getPrimitiveCategory(); switch (targetPrimitiveCategory) { case VOID: - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); + VectorizedBatchUtil.setNullColIsNullValue(columnVector, batchIndex); return; case BOOLEAN: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = (((BooleanWritable) object).get() ? 1 : 0); break; case BYTE: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((ByteWritable) object).get(); break; case SHORT: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((ShortWritable) object).get(); break; case INT: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((IntWritable) object).get(); break; case LONG: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((LongWritable) object).get(); break; case TIMESTAMP: - ((TimestampColumnVector) batch.cols[projectionColumnNum]).set( + ((TimestampColumnVector) columnVector).set( batchIndex, ((TimestampWritable) object).getTimestamp()); break; case DATE: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((DateWritable) object).getDays(); break; case FLOAT: - ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((DoubleColumnVector) columnVector).vector[batchIndex] = ((FloatWritable) object).get(); break; case DOUBLE: - ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((DoubleColumnVector) columnVector).vector[batchIndex] = ((DoubleWritable) object).get(); break; case BINARY: { BytesWritable bw = (BytesWritable) object; - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + ((BytesColumnVector) columnVector).setVal( batchIndex, bw.getBytes(), 0, bw.getLength()); } break; case STRING: { Text tw = (Text) object; - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + ((BytesColumnVector) columnVector).setVal( batchIndex, tw.getBytes(), 0, tw.getLength()); } break; @@ -420,7 +429,7 @@ public class VectorAssignRow { // TODO: HIVE-13624 Do we need maxLength checking? byte[] bytes = hiveVarchar.getValue().getBytes(); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + ((BytesColumnVector) columnVector).setVal( batchIndex, bytes, 0, bytes.length); } break; @@ -440,25 +449,25 @@ public class VectorAssignRow { // We store CHAR in vector row batch with padding stripped. byte[] bytes = hiveChar.getStrippedValue().getBytes(); - ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( + ((BytesColumnVector) columnVector).setVal( batchIndex, bytes, 0, bytes.length); } break; case DECIMAL: if (object instanceof HiveDecimal) { - ((DecimalColumnVector) batch.cols[projectionColumnNum]).set( + ((DecimalColumnVector) columnVector).set( batchIndex, (HiveDecimal) object); } else { - ((DecimalColumnVector) batch.cols[projectionColumnNum]).set( + ((DecimalColumnVector) columnVector).set( batchIndex, (HiveDecimalWritable) object); } break; case INTERVAL_YEAR_MONTH: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = + ((LongColumnVector) columnVector).vector[batchIndex] = ((HiveIntervalYearMonthWritable) object).getHiveIntervalYearMonth().getTotalMonths(); break; case INTERVAL_DAY_TIME: - ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).set( + ((IntervalDayTimeColumnVector) columnVector).set( batchIndex, ((HiveIntervalDayTimeWritable) object).getHiveIntervalDayTime()); break; default: @@ -467,14 +476,82 @@ public class VectorAssignRow { } } break; + case LIST: + { + final ListColumnVector listColumnVector = (ListColumnVector) columnVector; + final ListTypeInfo listTypeInfo = (ListTypeInfo) targetTypeInfo; + final TypeInfo elementTypeInfo = listTypeInfo.getListElementTypeInfo(); + final List list = (List) object; + final int size = list.size(); + final int childCount = listColumnVector.childCount; + listColumnVector.offsets[batchIndex] = childCount; + listColumnVector.lengths[batchIndex] = size; + listColumnVector.childCount = childCount + size; + listColumnVector.child.ensureSize(childCount + size, true); + + for (int i = 0; i < size; i++) { + assignRowColumn(listColumnVector.child, childCount + i, elementTypeInfo, list.get(i)); + } + } + break; + case MAP: + { + final MapColumnVector mapColumnVector = (MapColumnVector) columnVector; + final MapTypeInfo mapTypeInfo = (MapTypeInfo) targetTypeInfo; + final Map<Object, Object> map = (Map<Object, Object>) object; + final int size = map.size(); + int childCount = mapColumnVector.childCount; + mapColumnVector.offsets[batchIndex] = childCount; + mapColumnVector.lengths[batchIndex] = size; + mapColumnVector.keys.ensureSize(childCount + size, true); + mapColumnVector.values.ensureSize(childCount + size, true); + + for (Map.Entry<Object, Object> entry : map.entrySet()) { + assignRowColumn(mapColumnVector.keys, childCount, mapTypeInfo.getMapKeyTypeInfo(), entry.getKey()); + assignRowColumn(mapColumnVector.values, childCount, mapTypeInfo.getMapValueTypeInfo(), entry.getValue()); + childCount++; + } + mapColumnVector.childCount = childCount; + } + break; + case STRUCT: + { + final StructColumnVector structColumnVector = (StructColumnVector) columnVector; + final StructTypeInfo structTypeInfo = (StructTypeInfo) targetTypeInfo; + final List<TypeInfo> fieldStructTypeInfos = structTypeInfo.getAllStructFieldTypeInfos(); + final int size = fieldStructTypeInfos.size(); + if (object instanceof List) { + final List struct = (List) object; + for (int i = 0; i < size; i++) { + assignRowColumn(structColumnVector.fields[i], batchIndex, fieldStructTypeInfos.get(i), struct.get(i)); + } + } else { + final Object[] array = (Object[]) object; + for (int i = 0; i < size; i++) { + assignRowColumn(structColumnVector.fields[i], batchIndex, fieldStructTypeInfos.get(i), array[i]); + } + } + } + break; + case UNION: + { + final StandardUnion union = (StandardUnion) object; + final UnionColumnVector unionColumnVector = (UnionColumnVector) columnVector; + final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) targetTypeInfo; + final List<TypeInfo> objectTypeInfos = unionTypeInfo.getAllUnionObjectTypeInfos(); + final byte tag = union.getTag(); + unionColumnVector.tags[batchIndex] = tag; + assignRowColumn(unionColumnVector.fields[tag], batchIndex, objectTypeInfos.get(tag), union.getObject()); + } + break; default: - throw new RuntimeException("Category " + targetCategory.name() + " not supported"); + throw new RuntimeException("Category " + targetTypeInfo.getCategory().name() + " not supported"); } /* * We always set the null flag to false when there is a value. */ - batch.cols[projectionColumnNum].isNull[batchIndex] = false; + columnVector.isNull[batchIndex] = false; } /** @@ -493,7 +570,7 @@ public class VectorAssignRow { public void assignConvertRowColumn(VectorizedRowBatch batch, int batchIndex, int logicalColumnIndex, Object object) { Preconditions.checkState(isConvert[logicalColumnIndex]); - Category targetCategory = targetCategories[logicalColumnIndex]; + final Category targetCategory = targetTypeInfos[logicalColumnIndex].getCategory(); if (targetCategory == null) { /* * This is a column that we don't want (i.e. not included) -- we are done. @@ -508,7 +585,8 @@ public class VectorAssignRow { try { switch (targetCategory) { case PRIMITIVE: - PrimitiveCategory targetPrimitiveCategory = targetPrimitiveCategories[logicalColumnIndex]; + final PrimitiveCategory targetPrimitiveCategory = + ((PrimitiveTypeInfo) targetTypeInfos[logicalColumnIndex]).getPrimitiveCategory(); switch (targetPrimitiveCategory) { case VOID: VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); @@ -540,7 +618,7 @@ public class VectorAssignRow { break; case TIMESTAMP: { - Timestamp timestamp = + final Timestamp timestamp = PrimitiveObjectInspectorUtils.getTimestamp( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (timestamp == null) { @@ -553,13 +631,13 @@ public class VectorAssignRow { break; case DATE: { - Date date = PrimitiveObjectInspectorUtils.getDate( + final Date date = PrimitiveObjectInspectorUtils.getDate( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (date == null) { VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); return; } - DateWritable dateWritable = (DateWritable) convertTargetWritables[logicalColumnIndex]; + final DateWritable dateWritable = (DateWritable) convertTargetWritables[logicalColumnIndex]; dateWritable.set(date); ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = dateWritable.getDays(); @@ -577,7 +655,7 @@ public class VectorAssignRow { break; case BINARY: { - BytesWritable bytesWritable = + final BytesWritable bytesWritable = PrimitiveObjectInspectorUtils.getBinary( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (bytesWritable == null) { @@ -590,7 +668,7 @@ public class VectorAssignRow { break; case STRING: { - String string = PrimitiveObjectInspectorUtils.getString( + final String string = PrimitiveObjectInspectorUtils.getString( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (string == null) { VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); @@ -606,7 +684,7 @@ public class VectorAssignRow { { // UNDONE: Performance problem with conversion to String, then bytes... - HiveVarchar hiveVarchar = + final HiveVarchar hiveVarchar = PrimitiveObjectInspectorUtils.getHiveVarchar( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (hiveVarchar == null) { @@ -625,7 +703,7 @@ public class VectorAssignRow { { // UNDONE: Performance problem with conversion to String, then bytes... - HiveChar hiveChar = + final HiveChar hiveChar = PrimitiveObjectInspectorUtils.getHiveChar( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (hiveChar == null) { @@ -636,14 +714,14 @@ public class VectorAssignRow { // TODO: Do we need maxLength checking? - byte[] bytes = hiveChar.getStrippedValue().getBytes(); + final byte[] bytes = hiveChar.getStrippedValue().getBytes(); ((BytesColumnVector) batch.cols[projectionColumnNum]).setVal( batchIndex, bytes, 0, bytes.length); } break; case DECIMAL: { - HiveDecimal hiveDecimal = + final HiveDecimal hiveDecimal = PrimitiveObjectInspectorUtils.getHiveDecimal( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (hiveDecimal == null) { @@ -656,7 +734,7 @@ public class VectorAssignRow { break; case INTERVAL_YEAR_MONTH: { - HiveIntervalYearMonth intervalYearMonth = + final HiveIntervalYearMonth intervalYearMonth = PrimitiveObjectInspectorUtils.getHiveIntervalYearMonth( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (intervalYearMonth == null) { @@ -669,7 +747,7 @@ public class VectorAssignRow { break; case INTERVAL_DAY_TIME: { - HiveIntervalDayTime intervalDayTime = + final HiveIntervalDayTime intervalDayTime = PrimitiveObjectInspectorUtils.getHiveIntervalDayTime( object, convertSourcePrimitiveObjectInspectors[logicalColumnIndex]); if (intervalDayTime == null) { http://git-wip-us.apache.org/repos/asf/hive/blob/d467e172/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java index fc82cf7..e37816f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorDeserializeRow.java @@ -18,8 +18,8 @@ package org.apache.hadoop.hive.ql.exec.vector; -import java.io.EOFException; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; @@ -42,8 +42,12 @@ import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.UnionTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; @@ -91,6 +95,82 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { private VectorDeserializeRow() { } + private static class Field { + + private Category category; + + private PrimitiveCategory primitiveCategory; + //The data type primitive category of the column being deserialized. + + private int maxLength; + // For the CHAR and VARCHAR data types, the maximum character length of + // the column. Otherwise, 0. + + private boolean isConvert; + + /* + * This member has information for data type conversion. + * Not defined if there is no conversion. + */ + Writable conversionWritable; + // Conversion requires source be placed in writable so we can call upon + // VectorAssignRow to convert and assign the row column. + + private ComplexTypeHelper complexTypeHelper; + // For a complex type, a helper object that describes elements, key/value pairs, + // or fields. + + public Field(PrimitiveCategory primitiveCategory, int maxLength) { + this.category = Category.PRIMITIVE; + this.primitiveCategory = primitiveCategory; + this.maxLength = maxLength; + this.isConvert = false; + this.conversionWritable = null; + this.complexTypeHelper = null; + } + + public Field(Category category, ComplexTypeHelper complexTypeHelper) { + this.category = category; + this.primitiveCategory = null; + this.maxLength = 0; + this.isConvert = false; + this.conversionWritable = null; + this.complexTypeHelper = complexTypeHelper; + } + + public Category getCategory() { + return category; + } + + public PrimitiveCategory getPrimitiveCategory() { + return primitiveCategory; + } + + public int getMaxLength() { + return maxLength; + } + + public void setIsConvert(boolean isConvert) { + this.isConvert = isConvert; + } + + public boolean getIsConvert() { + return isConvert; + } + + public void setConversionWritable(Writable conversionWritable) { + this.conversionWritable = conversionWritable; + } + + public Writable getConversionWritable() { + return conversionWritable; + } + + public ComplexTypeHelper getComplexHelper() { + return complexTypeHelper; + } + } + /* * These members have information for deserializing a row into the VectorizedRowBatch * columns. @@ -105,30 +185,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { private int[] readFieldLogicalIndices; // The logical indices for reading with readField. - private boolean[] isConvert; - // For each column, are we converting the row column? - private int[] projectionColumnNums; // Assigning can be a subset of columns, so this is the projection -- // the batch column numbers. - private Category[] sourceCategories; - // The data type category of each column being deserialized. - - private PrimitiveCategory[] sourcePrimitiveCategories; - //The data type primitive category of each column being deserialized. - - private int[] maxLengths; - // For the CHAR and VARCHAR data types, the maximum character length of - // the columns. Otherwise, 0. - - /* - * These members have information for data type conversion. - * Not defined if there is no conversion. - */ - Writable[] convertSourceWritables; - // Conversion requires source be placed in writable so we can call upon - // VectorAssignRow to convert and assign the row column. + private Field topLevelFields[]; VectorAssignRow convertVectorAssignRow; // Use its conversion ability. @@ -137,62 +198,117 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * Allocate the source deserialization related arrays. */ private void allocateArrays(int count) { - isConvert = new boolean[count]; projectionColumnNums = new int[count]; Arrays.fill(projectionColumnNums, -1); - sourceCategories = new Category[count]; - sourcePrimitiveCategories = new PrimitiveCategory[count]; - maxLengths = new int[count]; + topLevelFields = new Field[count]; } - /* - * Allocate the conversion related arrays (optional). - */ - private void allocateConvertArrays(int count) { - convertSourceWritables = new Writable[count]; + private Field allocatePrimitiveField(TypeInfo sourceTypeInfo) { + final PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo; + final PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory(); + final int maxLength; + switch (sourcePrimitiveCategory) { + case CHAR: + maxLength = ((CharTypeInfo) sourcePrimitiveTypeInfo).getLength(); + break; + case VARCHAR: + maxLength = ((VarcharTypeInfo) sourcePrimitiveTypeInfo).getLength(); + break; + default: + // No additional data type specific setting. + maxLength = 0; + break; + } + return new Field(sourcePrimitiveCategory, maxLength); + } + + private Field allocateComplexField(TypeInfo sourceTypeInfo) { + final Category category = sourceTypeInfo.getCategory(); + switch (category) { + case LIST: + { + final ListTypeInfo listTypeInfo = (ListTypeInfo) sourceTypeInfo; + final ListComplexTypeHelper listHelper = + new ListComplexTypeHelper( + allocateField(listTypeInfo.getListElementTypeInfo())); + return new Field(category, listHelper); + } + case MAP: + { + final MapTypeInfo mapTypeInfo = (MapTypeInfo) sourceTypeInfo; + final MapComplexTypeHelper mapHelper = + new MapComplexTypeHelper( + allocateField(mapTypeInfo.getMapKeyTypeInfo()), + allocateField(mapTypeInfo.getMapValueTypeInfo())); + return new Field(category, mapHelper); + } + case STRUCT: + { + final StructTypeInfo structTypeInfo = (StructTypeInfo) sourceTypeInfo; + final ArrayList<TypeInfo> fieldTypeInfoList = structTypeInfo.getAllStructFieldTypeInfos(); + final int count = fieldTypeInfoList.size(); + final Field[] fields = new Field[count]; + for (int i = 0; i < count; i++) { + fields[i] = allocateField(fieldTypeInfoList.get(i)); + } + final StructComplexTypeHelper structHelper = + new StructComplexTypeHelper(fields); + return new Field(category, structHelper); + } + case UNION: + { + final UnionTypeInfo unionTypeInfo = (UnionTypeInfo) sourceTypeInfo; + final List<TypeInfo> fieldTypeInfoList = unionTypeInfo.getAllUnionObjectTypeInfos(); + final int count = fieldTypeInfoList.size(); + final Field[] fields = new Field[count]; + for (int i = 0; i < count; i++) { + fields[i] = allocateField(fieldTypeInfoList.get(i)); + } + final UnionComplexTypeHelper unionHelper = + new UnionComplexTypeHelper(fields); + return new Field(category, unionHelper); + } + default: + throw new RuntimeException("Category " + category + " not supported"); + } + } + + private Field allocateField(TypeInfo sourceTypeInfo) { + switch (sourceTypeInfo.getCategory()) { + case PRIMITIVE: + return allocatePrimitiveField(sourceTypeInfo); + case LIST: + case MAP: + case STRUCT: + case UNION: + return allocateComplexField(sourceTypeInfo); + default: + throw new RuntimeException("Category " + sourceTypeInfo.getCategory() + " not supported"); + } } /* - * Initialize one column's source deserializtion related arrays. + * Initialize one column's source deserializtion information. */ - private void initSourceEntry(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) { - isConvert[logicalColumnIndex] = false; + private void initTopLevelField(int logicalColumnIndex, int projectionColumnNum, TypeInfo sourceTypeInfo) { + projectionColumnNums[logicalColumnIndex] = projectionColumnNum; - Category sourceCategory = sourceTypeInfo.getCategory(); - sourceCategories[logicalColumnIndex] = sourceCategory; - if (sourceCategory == Category.PRIMITIVE) { - PrimitiveTypeInfo sourcePrimitiveTypeInfo = (PrimitiveTypeInfo) sourceTypeInfo; - PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveTypeInfo.getPrimitiveCategory(); - sourcePrimitiveCategories[logicalColumnIndex] = sourcePrimitiveCategory; - switch (sourcePrimitiveCategory) { - case CHAR: - maxLengths[logicalColumnIndex] = ((CharTypeInfo) sourcePrimitiveTypeInfo).getLength(); - break; - case VARCHAR: - maxLengths[logicalColumnIndex] = ((VarcharTypeInfo) sourcePrimitiveTypeInfo).getLength(); - break; - default: - // No additional data type specific setting. - break; - } - } else { - // We don't currently support complex types. - Preconditions.checkState(false); - } + + topLevelFields[logicalColumnIndex] = allocateField(sourceTypeInfo); } /* - * Initialize the conversion related arrays. Assumes initSourceEntry has already been called. + * Initialize the conversion related arrays. Assumes initTopLevelField has already been called. */ - private void initConvertTargetEntry(int logicalColumnIndex) { - isConvert[logicalColumnIndex] = true; + private void addTopLevelConversion(int logicalColumnIndex) { - if (sourceCategories[logicalColumnIndex] == Category.PRIMITIVE) { - convertSourceWritables[logicalColumnIndex] = - VectorizedBatchUtil.getPrimitiveWritable(sourcePrimitiveCategories[logicalColumnIndex]); - } else { - // We don't currently support complex types. - Preconditions.checkState(false); + final Field field = topLevelFields[logicalColumnIndex]; + field.setIsConvert(true); + + if (field.getCategory() == Category.PRIMITIVE) { + + field.setConversionWritable( + VectorizedBatchUtil.getPrimitiveWritable(field.getPrimitiveCategory())); } } @@ -206,7 +322,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { for (int i = 0; i < count; i++) { int outputColumn = outputColumns[i]; - initSourceEntry(i, outputColumn, sourceTypeInfos[i]); + initTopLevelField(i, outputColumn, sourceTypeInfos[i]); } } @@ -220,7 +336,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { for (int i = 0; i < count; i++) { int outputColumn = outputColumns.get(i); - initSourceEntry(i, outputColumn, sourceTypeInfos[i]); + initTopLevelField(i, outputColumn, sourceTypeInfos[i]); } } @@ -234,7 +350,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { for (int i = 0; i < count; i++) { int outputColumn = startColumn + i; - initSourceEntry(i, outputColumn, sourceTypeInfos[i]); + initTopLevelField(i, outputColumn, sourceTypeInfos[i]); } } @@ -250,7 +366,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { allocateArrays(columnCount); int includedCount = 0; - int[] includedIndices = new int[columnCount]; + final int[] includedIndices = new int[columnCount]; for (int i = 0; i < columnCount; i++) { @@ -260,7 +376,7 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { } else { - initSourceEntry(i, i, sourceTypeInfos[i]); + initTopLevelField(i, i, sourceTypeInfos[i]); includedIndices[includedCount++] = i; } } @@ -298,7 +414,6 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { final int columnCount = sourceTypeInfos.length; allocateArrays(columnCount); - allocateConvertArrays(columnCount); int includedCount = 0; int[] includedIndices = new int[columnCount]; @@ -320,20 +435,22 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { if (VectorPartitionConversion.isImplicitVectorColumnConversion(sourceTypeInfo, targetTypeInfo)) { // Do implicit conversion from source type to target type. - initSourceEntry(i, i, sourceTypeInfo); + initTopLevelField(i, i, sourceTypeInfo); } else { // Do formal conversion... - initSourceEntry(i, i, sourceTypeInfo); - initConvertTargetEntry(i); + initTopLevelField(i, i, sourceTypeInfo); + + // UNDONE: No for List and Map; Yes for Struct and Union when field count different... + addTopLevelConversion(i); atLeastOneConvert = true; } } else { // No conversion. - initSourceEntry(i, i, sourceTypeInfo); + initTopLevelField(i, i, sourceTypeInfo); } @@ -360,6 +477,379 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { init(0); } + private void storePrimitiveRowColumn(ColumnVector colVector, Field field, + int batchIndex, boolean canRetainByteRef) throws IOException { + + switch (field.getPrimitiveCategory()) { + case VOID: + VectorizedBatchUtil.setNullColIsNullValue(colVector, batchIndex); + return; + case BOOLEAN: + ((LongColumnVector) colVector).vector[batchIndex] = (deserializeRead.currentBoolean ? 1 : 0); + break; + case BYTE: + ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentByte; + break; + case SHORT: + ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentShort; + break; + case INT: + ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentInt; + break; + case LONG: + ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentLong; + break; + case TIMESTAMP: + ((TimestampColumnVector) colVector).set( + batchIndex, deserializeRead.currentTimestampWritable.getTimestamp()); + break; + case DATE: + ((LongColumnVector) colVector).vector[batchIndex] = deserializeRead.currentDateWritable.getDays(); + break; + case FLOAT: + ((DoubleColumnVector) colVector).vector[batchIndex] = deserializeRead.currentFloat; + break; + case DOUBLE: + ((DoubleColumnVector) colVector).vector[batchIndex] = deserializeRead.currentDouble; + break; + case BINARY: + case STRING: + { + final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector); + if (deserializeRead.currentExternalBufferNeeded) { + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + deserializeRead.copyToExternalBuffer( + bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart()); + bytesColVec.setValPreallocated( + batchIndex, + deserializeRead.currentExternalBufferNeededLen); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength); + } + } + break; + case VARCHAR: + { + // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method + // that does not use Java String objects. + final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + final byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + final int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.truncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + field.getMaxLength())); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + field.getMaxLength())); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.truncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + field.getMaxLength())); + } + } + break; + case CHAR: + { + // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method + // that does not use Java String objects. + final BytesColumnVector bytesColVec = ((BytesColumnVector) colVector); + if (deserializeRead.currentExternalBufferNeeded) { + // Write directly into our BytesColumnVector value buffer. + bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); + final byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); + final int convertBufferStart = bytesColVec.getValPreallocatedStart(); + deserializeRead.copyToExternalBuffer( + convertBuffer, + convertBufferStart); + bytesColVec.setValPreallocated( + batchIndex, + StringExpr.rightTrimAndTruncate( + convertBuffer, + convertBufferStart, + deserializeRead.currentExternalBufferNeededLen, + field.getMaxLength())); + } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { + bytesColVec.setRef( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + field.getMaxLength())); + } else { + bytesColVec.setVal( + batchIndex, + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + StringExpr.rightTrimAndTruncate( + deserializeRead.currentBytes, + deserializeRead.currentBytesStart, + deserializeRead.currentBytesLength, + field.getMaxLength())); + } + } + break; + case DECIMAL: + // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields. + ((DecimalColumnVector) colVector).set( + batchIndex, deserializeRead.currentHiveDecimalWritable); + break; + case INTERVAL_YEAR_MONTH: + ((LongColumnVector) colVector).vector[batchIndex] = + deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth().getTotalMonths(); + break; + case INTERVAL_DAY_TIME: + ((IntervalDayTimeColumnVector) colVector).set( + batchIndex, deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime()); + break; + default: + throw new RuntimeException("Primitive category " + field.getPrimitiveCategory() + + " not supported"); + } + } + + private static class ComplexTypeHelper { + } + + private static class ListComplexTypeHelper extends ComplexTypeHelper { + + private Field elementField; + + public ListComplexTypeHelper(Field elementField) { + this.elementField = elementField; + } + + public Field getElementField() { + return elementField; + } + } + + private static class MapComplexTypeHelper extends ComplexTypeHelper { + + private Field keyField; + private Field valueField; + + public MapComplexTypeHelper(Field keyField, Field valueField) { + this.keyField = keyField; + this.valueField = valueField; + } + + public Field getKeyField() { + return keyField; + } + + public Field getValueField() { + return valueField; + } + } + + private static class FieldsComplexTypeHelper extends ComplexTypeHelper { + + private Field[] fields; + + public FieldsComplexTypeHelper(Field[] fields) { + this.fields = fields; + } + + public Field[] getFields() { + return fields; + } + } + + private static class StructComplexTypeHelper extends FieldsComplexTypeHelper { + + public StructComplexTypeHelper(Field[] fields) { + super(fields); + } + } + + private static class UnionComplexTypeHelper extends FieldsComplexTypeHelper { + + public UnionComplexTypeHelper(Field[] fields) { + super(fields); + } + } + + // UNDONE: Presumption of *append* + + private void storeComplexFieldRowColumn(ColumnVector fieldColVector, + Field field, int batchIndex, boolean canRetainByteRef) throws IOException { + + if (!deserializeRead.readComplexField()) { + fieldColVector.isNull[batchIndex] = true; + fieldColVector.noNulls = false; + return; + } + + switch (field.getCategory()) { + case PRIMITIVE: + storePrimitiveRowColumn(fieldColVector, field, batchIndex, canRetainByteRef); + break; + case LIST: + storeListRowColumn(fieldColVector, field, batchIndex, canRetainByteRef); + break; + case MAP: + storeMapRowColumn(fieldColVector, field, batchIndex, canRetainByteRef); + break; + case STRUCT: + storeStructRowColumn(fieldColVector, field, batchIndex, canRetainByteRef); + break; + case UNION: + storeUnionRowColumn(fieldColVector, field, batchIndex, canRetainByteRef); + break; + default: + throw new RuntimeException("Category " + field.getCategory() + " not supported"); + } + } + + private void storeListRowColumn(ColumnVector colVector, + Field field, int batchIndex, boolean canRetainByteRef) throws IOException { + + final ListColumnVector listColVector = (ListColumnVector) colVector; + final ColumnVector elementColVector = listColVector.child; + int offset = listColVector.childCount; + listColVector.isNull[batchIndex] = false; + listColVector.offsets[batchIndex] = offset; + + final ListComplexTypeHelper listHelper = (ListComplexTypeHelper) field.getComplexHelper(); + + int listLength = 0; + while (deserializeRead.isNextComplexMultiValue()) { + + // Ensure child size. + final int childCapacity = listColVector.child.isNull.length; + final int childCount = listColVector.childCount; + if (childCapacity < childCount / 0.75) { + listColVector.child.ensureSize(childCapacity * 2, true); + } + + storeComplexFieldRowColumn( + elementColVector, listHelper.getElementField(), offset, canRetainByteRef); + offset++; + listLength++; + } + + listColVector.childCount += listLength; + listColVector.lengths[batchIndex] = listLength; + } + + private void storeMapRowColumn(ColumnVector colVector, + Field field, int batchIndex, boolean canRetainByteRef) throws IOException { + + final MapColumnVector mapColVector = (MapColumnVector) colVector; + final MapComplexTypeHelper mapHelper = (MapComplexTypeHelper) field.getComplexHelper(); + final ColumnVector keysColVector = mapColVector.keys; + final ColumnVector valuesColVector = mapColVector.values; + int offset = mapColVector.childCount; + mapColVector.offsets[batchIndex] = offset; + mapColVector.isNull[batchIndex] = false; + + int keyValueCount = 0; + while (deserializeRead.isNextComplexMultiValue()) { + + // Ensure child size. + final int childCapacity = mapColVector.keys.isNull.length; + final int childCount = mapColVector.childCount; + if (childCapacity < childCount / 0.75) { + mapColVector.keys.ensureSize(childCapacity * 2, true); + mapColVector.values.ensureSize(childCapacity * 2, true); + } + + // Key. + storeComplexFieldRowColumn( + keysColVector, mapHelper.getKeyField(), offset, canRetainByteRef); + + // Value. + storeComplexFieldRowColumn( + valuesColVector, mapHelper.getValueField(), offset, canRetainByteRef); + + offset++; + keyValueCount++; + } + + mapColVector.childCount += keyValueCount; + mapColVector.lengths[batchIndex] = keyValueCount; + } + + private void storeStructRowColumn(ColumnVector colVector, + Field field, int batchIndex, boolean canRetainByteRef) throws IOException { + + final StructColumnVector structColVector = (StructColumnVector) colVector; + final ColumnVector[] colVectorFields = structColVector.fields; + final StructComplexTypeHelper structHelper = (StructComplexTypeHelper) field.getComplexHelper(); + final Field[] fields = structHelper.getFields(); + structColVector.isNull[batchIndex] = false; + + int i = 0; + for (ColumnVector colVectorField : colVectorFields) { + storeComplexFieldRowColumn( + colVectorField, + fields[i], + batchIndex, + canRetainByteRef); + i++; + } + deserializeRead.finishComplexVariableFieldsType(); + } + + private void storeUnionRowColumn(ColumnVector colVector, + Field field, int batchIndex, boolean canRetainByteRef) throws IOException { + + deserializeRead.readComplexField(); + + // The read field of the Union gives us its tag. + final int tag = deserializeRead.currentInt; + + final UnionColumnVector unionColVector = (UnionColumnVector) colVector; + final ColumnVector[] colVectorFields = unionColVector.fields; + final UnionComplexTypeHelper unionHelper = (UnionComplexTypeHelper) field.getComplexHelper(); + + unionColVector.isNull[batchIndex] = false; + unionColVector.tags[batchIndex] = tag; + + storeComplexFieldRowColumn( + colVectorFields[tag], + unionHelper.getFields()[tag], + batchIndex, + canRetainByteRef); + deserializeRead.finishComplexVariableFieldsType(); + } + /** * Store one row column value that is the current value in deserializeRead. * @@ -374,186 +864,29 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * @throws IOException */ private void storeRowColumn(VectorizedRowBatch batch, int batchIndex, - int logicalColumnIndex, boolean canRetainByteRef) throws IOException { + Field field, int logicalColumnIndex, boolean canRetainByteRef) throws IOException { final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; - switch (sourceCategories[logicalColumnIndex]) { + ColumnVector colVector = batch.cols[projectionColumnNum]; + + switch (field.getCategory()) { case PRIMITIVE: - { - PrimitiveCategory sourcePrimitiveCategory = sourcePrimitiveCategories[logicalColumnIndex]; - switch (sourcePrimitiveCategory) { - case VOID: - VectorizedBatchUtil.setNullColIsNullValue(batch.cols[projectionColumnNum], batchIndex); - return; - case BOOLEAN: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - (deserializeRead.currentBoolean ? 1 : 0); - break; - case BYTE: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentByte; - break; - case SHORT: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentShort; - break; - case INT: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentInt; - break; - case LONG: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentLong; - break; - case TIMESTAMP: - ((TimestampColumnVector) batch.cols[projectionColumnNum]).set( - batchIndex, deserializeRead.currentTimestampWritable.getTimestamp()); - break; - case DATE: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentDateWritable.getDays(); - break; - case FLOAT: - ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentFloat; - break; - case DOUBLE: - ((DoubleColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentDouble; - break; - case BINARY: - case STRING: - { - BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); - if (deserializeRead.currentExternalBufferNeeded) { - bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); - deserializeRead.copyToExternalBuffer( - bytesColVec.getValPreallocatedBytes(), bytesColVec.getValPreallocatedStart()); - bytesColVec.setValPreallocated( - batchIndex, - deserializeRead.currentExternalBufferNeededLen); - } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { - bytesColVec.setRef( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength); - } else { - bytesColVec.setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength); - } - } - break; - case VARCHAR: - { - // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method - // that does not use Java String objects. - BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); - if (deserializeRead.currentExternalBufferNeeded) { - // Write directly into our BytesColumnVector value buffer. - bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); - byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); - int convertBufferStart = bytesColVec.getValPreallocatedStart(); - deserializeRead.copyToExternalBuffer( - convertBuffer, - convertBufferStart); - bytesColVec.setValPreallocated( - batchIndex, - StringExpr.truncate( - convertBuffer, - convertBufferStart, - deserializeRead.currentExternalBufferNeededLen, - maxLengths[logicalColumnIndex])); - } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { - bytesColVec.setRef( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - StringExpr.truncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex])); - } else { - bytesColVec.setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - StringExpr.truncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex])); - } - } - break; - case CHAR: - { - // Use the basic STRING bytes read to get access, then use our optimal truncate/trim method - // that does not use Java String objects. - BytesColumnVector bytesColVec = ((BytesColumnVector) batch.cols[projectionColumnNum]); - if (deserializeRead.currentExternalBufferNeeded) { - // Write directly into our BytesColumnVector value buffer. - bytesColVec.ensureValPreallocated(deserializeRead.currentExternalBufferNeededLen); - byte[] convertBuffer = bytesColVec.getValPreallocatedBytes(); - int convertBufferStart = bytesColVec.getValPreallocatedStart(); - deserializeRead.copyToExternalBuffer( - convertBuffer, - convertBufferStart); - bytesColVec.setValPreallocated( - batchIndex, - StringExpr.rightTrimAndTruncate( - convertBuffer, - convertBufferStart, - deserializeRead.currentExternalBufferNeededLen, - maxLengths[logicalColumnIndex])); - } else if (canRetainByteRef && inputBytes == deserializeRead.currentBytes) { - bytesColVec.setRef( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - StringExpr.rightTrimAndTruncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex])); - } else { - bytesColVec.setVal( - batchIndex, - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - StringExpr.rightTrimAndTruncate( - deserializeRead.currentBytes, - deserializeRead.currentBytesStart, - deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex])); - } - } - break; - case DECIMAL: - // The DecimalColumnVector set method will quickly copy the deserialized decimal writable fields. - ((DecimalColumnVector) batch.cols[projectionColumnNum]).set( - batchIndex, deserializeRead.currentHiveDecimalWritable); - break; - case INTERVAL_YEAR_MONTH: - ((LongColumnVector) batch.cols[projectionColumnNum]).vector[batchIndex] = - deserializeRead.currentHiveIntervalYearMonthWritable.getHiveIntervalYearMonth().getTotalMonths(); - break; - case INTERVAL_DAY_TIME: - ((IntervalDayTimeColumnVector) batch.cols[projectionColumnNum]).set( - batchIndex, deserializeRead.currentHiveIntervalDayTimeWritable.getHiveIntervalDayTime()); - break; - default: - throw new RuntimeException("Primitive category " + sourcePrimitiveCategory.name() + - " not supported"); - } - } + storePrimitiveRowColumn(colVector, field, batchIndex, canRetainByteRef); + break; + case LIST: + storeListRowColumn(colVector, field, batchIndex, canRetainByteRef); + break; + case MAP: + storeMapRowColumn(colVector, field, batchIndex, canRetainByteRef); + break; + case STRUCT: + storeStructRowColumn(colVector, field, batchIndex, canRetainByteRef); + break; + case UNION: + storeUnionRowColumn(colVector, field, batchIndex, canRetainByteRef); break; default: - throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported"); + throw new RuntimeException("Category " + field.getCategory() + " not supported"); } // We always set the null flag to false when there is a value. @@ -572,13 +905,13 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * @throws IOException */ private void convertRowColumn(VectorizedRowBatch batch, int batchIndex, - int logicalColumnIndex) throws IOException { - final int projectionColumnNum = projectionColumnNums[logicalColumnIndex]; - Writable convertSourceWritable = convertSourceWritables[logicalColumnIndex]; - switch (sourceCategories[logicalColumnIndex]) { + Field field, int logicalColumnIndex) throws IOException { + + Writable convertSourceWritable = field.getConversionWritable(); + switch (field.getCategory()) { case PRIMITIVE: { - switch (sourcePrimitiveCategories[logicalColumnIndex]) { + switch (field.getPrimitiveCategory()) { case VOID: convertSourceWritable = null; break; @@ -611,7 +944,9 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { break; case BINARY: if (deserializeRead.currentBytes == null) { - LOG.info("null binary entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum); + LOG.info( + "null binary entry: batchIndex " + batchIndex + " projection column num " + + projectionColumnNums[logicalColumnIndex]); } ((BytesWritable) convertSourceWritable).set( @@ -622,7 +957,8 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { case STRING: if (deserializeRead.currentBytes == null) { throw new RuntimeException( - "null string entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum); + "null string entry: batchIndex " + batchIndex + " projection column num " + + projectionColumnNums[logicalColumnIndex]); } // Use org.apache.hadoop.io.Text as our helper to go from byte[] to String. @@ -637,14 +973,15 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { // that does not use Java String objects. if (deserializeRead.currentBytes == null) { throw new RuntimeException( - "null varchar entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum); + "null varchar entry: batchIndex " + batchIndex + " projection column num " + + projectionColumnNums[logicalColumnIndex]); } int adjustedLength = StringExpr.truncate( deserializeRead.currentBytes, deserializeRead.currentBytesStart, deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); + field.getMaxLength()); ((HiveVarcharWritable) convertSourceWritable).set( new String( @@ -661,14 +998,15 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { // that does not use Java String objects. if (deserializeRead.currentBytes == null) { throw new RuntimeException( - "null char entry: batchIndex " + batchIndex + " projection column num " + projectionColumnNum); + "null char entry: batchIndex " + batchIndex + " projection column num " + + projectionColumnNums[logicalColumnIndex]); } int adjustedLength = StringExpr.rightTrimAndTruncate( deserializeRead.currentBytes, deserializeRead.currentBytesStart, deserializeRead.currentBytesLength, - maxLengths[logicalColumnIndex]); + field.getMaxLength()); ((HiveCharWritable) convertSourceWritable).set( new String( @@ -691,13 +1029,26 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { deserializeRead.currentHiveIntervalDayTimeWritable); break; default: - throw new RuntimeException("Primitive category " + sourcePrimitiveCategories[logicalColumnIndex] + + throw new RuntimeException("Primitive category " + field.getPrimitiveCategory() + " not supported"); } } break; + + case STRUCT: + case UNION: + // The only aspect of conversion to Struct / Union themselves is add fields as NULL on the end + // (no removal from end? which would mean skipping fields...) + + // UNDONE + break; + + case LIST: + case MAP: + // Conversion only happens below to List elements or Map key and/or values and not to the + // List or Map itself. default: - throw new RuntimeException("Category " + sourceCategories[logicalColumnIndex] + " not supported"); + throw new RuntimeException("Category " + field.getCategory() + " not supported"); } /* @@ -739,7 +1090,10 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { // Pass false for canRetainByteRef since we will NOT be keeping byte references to the input // bytes with the BytesColumnVector.setRef method. - final int count = isConvert.length; + final int count = topLevelFields.length; + + Field field; + if (!useReadField) { for (int i = 0; i < count; i++) { final int projectionColumnNum = projectionColumnNums[i]; @@ -755,10 +1109,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { continue; } // The current* members of deserializeRead have the field value. - if (isConvert[i]) { - convertRowColumn(batch, batchIndex, i); + field = topLevelFields[i]; + if (field.getIsConvert()) { + convertRowColumn(batch, batchIndex, field, i); } else { - storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ false); + storeRowColumn(batch, batchIndex, field, i, /* canRetainByteRef */ false); } } } else { @@ -773,10 +1128,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { continue; } // The current* members of deserializeRead have the field value. - if (isConvert[logicalIndex]) { - convertRowColumn(batch, batchIndex, logicalIndex); + field = topLevelFields[logicalIndex]; + if (field.getIsConvert()) { + convertRowColumn(batch, batchIndex, field, logicalIndex); } else { - storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ false); + storeRowColumn(batch, batchIndex, field, logicalIndex, /* canRetainByteRef */ false); } } } @@ -803,7 +1159,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { * @throws IOException */ public void deserializeByRef(VectorizedRowBatch batch, int batchIndex) throws IOException { - final int count = isConvert.length; + + final int count = topLevelFields.length; + + Field field; + if (!useReadField) { for (int i = 0; i < count; i++) { final int projectionColumnNum = projectionColumnNums[i]; @@ -819,10 +1179,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { continue; } // The current* members of deserializeRead have the field value. - if (isConvert[i]) { - convertRowColumn(batch, batchIndex, i); + field = topLevelFields[i]; + if (field.getIsConvert()) { + convertRowColumn(batch, batchIndex, field, i); } else { - storeRowColumn(batch, batchIndex, i, /* canRetainByteRef */ true); + storeRowColumn(batch, batchIndex, field, i, /* canRetainByteRef */ true); } } } else { @@ -837,10 +1198,11 @@ public final class VectorDeserializeRow<T extends DeserializeRead> { continue; } // The current* members of deserializeRead have the field value. - if (isConvert[logicalIndex]) { - convertRowColumn(batch, batchIndex, logicalIndex); + field = topLevelFields[logicalIndex]; + if (field.getIsConvert()) { + convertRowColumn(batch, batchIndex, field, logicalIndex); } else { - storeRowColumn(batch, batchIndex, logicalIndex, /* canRetainByteRef */ true); + storeRowColumn(batch, batchIndex, field, logicalIndex, /* canRetainByteRef */ true); } } }
