HIVE-9605:Remove parquet nested objects from wrapper writable objects (Sergio Pena, reviewed by Ferdinand Xu)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/4157374d Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/4157374d Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/4157374d Branch: refs/heads/master Commit: 4157374d1538afa7a36d2484a9e4abd1016a2ef3 Parents: 5afdea9 Author: Ferdinand Xu <cheng.a...@intel.com> Authored: Tue May 26 14:59:25 2015 -0400 Committer: Ferdinand Xu <cheng.a...@intel.com> Committed: Tue May 26 14:59:25 2015 -0400 ---------------------------------------------------------------------- .../benchmark/storage/ColumnarStorageBench.java | 89 ++++--- .../exec/vector/VectorColumnAssignFactory.java | 74 +++--- .../ql/io/parquet/MapredParquetInputFormat.java | 14 +- .../parquet/VectorizedParquetInputFormat.java | 19 +- .../ql/io/parquet/convert/ConverterParent.java | 4 +- .../convert/DataWritableRecordConverter.java | 6 +- .../ql/io/parquet/convert/ETypeConverter.java | 11 +- .../convert/HiveCollectionConverter.java | 23 +- .../io/parquet/convert/HiveGroupConverter.java | 2 +- .../io/parquet/convert/HiveStructConverter.java | 22 +- .../hive/ql/io/parquet/convert/Repeated.java | 21 +- .../parquet/read/DataWritableReadSupport.java | 6 +- .../read/ParquetRecordReaderWrapper.java | 25 +- .../serde/AbstractParquetMapInspector.java | 23 +- .../serde/ArrayWritableObjectInspector.java | 249 ------------------- .../serde/DeepParquetHiveMapInspector.java | 15 +- .../ObjectArrayWritableObjectInspector.java | 248 ++++++++++++++++++ .../serde/ParquetHiveArrayInspector.java | 35 +-- .../ql/io/parquet/serde/ParquetHiveSerDe.java | 8 +- .../serde/StandardParquetHiveMapInspector.java | 15 +- .../serde/primitive/ParquetByteInspector.java | 3 +- .../serde/primitive/ParquetShortInspector.java | 2 + .../io/parquet/AbstractTestParquetDirect.java | 38 +-- .../ql/io/parquet/TestArrayCompatibility.java | 88 ++++--- .../ql/io/parquet/TestDataWritableWriter.java | 65 +++-- .../hive/ql/io/parquet/TestMapStructures.java | 30 +-- .../parquet/TestMapredParquetInputFormat.java | 4 +- .../io/parquet/TestParquetRowGroupFilter.java | 6 +- .../hive/serde2/io/ObjectArrayWritable.java | 51 ++++ 29 files changed, 618 insertions(+), 578 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java ---------------------------------------------------------------------- diff --git a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java index 61c2eb4..eef7800 100644 --- a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java +++ b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java @@ -24,12 +24,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.ObjectArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -40,9 +40,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.FileSplit; @@ -172,20 +169,16 @@ public class ColumnarStorageBench { return fs.getFileStatus(path).getLen(); } - private ArrayWritable record(Writable... fields) { - return new ArrayWritable(Writable.class, fields); - } - - private Writable getPrimitiveWritable(final PrimitiveTypeInfo typeInfo) { + private Object createPrimitiveObject(final PrimitiveTypeInfo typeInfo) { Random rand = new Random(); switch (typeInfo.getPrimitiveCategory()) { case INT: - return new IntWritable(rand.nextInt()); + return rand.nextInt(); case DOUBLE: - return new DoubleWritable(rand.nextDouble()); + return rand.nextDouble(); case BOOLEAN: - return new BooleanWritable(rand.nextBoolean()); + return rand.nextBoolean(); case CHAR: case VARCHAR: case STRING: @@ -197,36 +190,52 @@ public class ColumnarStorageBench { } } - private ArrayWritable createRecord(final List<TypeInfo> columnTypes) { - Writable[] fields = new Writable[columnTypes.size()]; + private Object crreateObjectFromType(final TypeInfo typeInfo) { + switch (typeInfo.getCategory()) { + case PRIMITIVE: + return createPrimitiveObject((PrimitiveTypeInfo) typeInfo); + case LIST: + return createListObject((ListTypeInfo) typeInfo); + case MAP: + return createMapObject((MapTypeInfo) typeInfo); + case STRUCT: + return createStructObject(((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos()); + default: + throw new IllegalStateException("Invalid column type: " + typeInfo); + } + } + + private ObjectArrayWritable createListObject(final ListTypeInfo typeInfo) { + List<Object> list = new ArrayList<Object>(); + list.add(crreateObjectFromType(typeInfo.getListElementTypeInfo())); + + return new ObjectArrayWritable(list.toArray()); + } + + private ObjectArrayWritable createMapObject(final MapTypeInfo typeInfo) { + Object[] keyValue = new Object[2]; + keyValue[0] = crreateObjectFromType(typeInfo.getMapKeyTypeInfo()); + keyValue[1] = crreateObjectFromType(typeInfo.getMapValueTypeInfo()); + + ObjectArrayWritable map = new ObjectArrayWritable(keyValue); + + List<ObjectArrayWritable> list = new ArrayList<ObjectArrayWritable>(); + list.add(map); + + return new ObjectArrayWritable(list.toArray()); + } + + + + private ObjectArrayWritable createStructObject(final List<TypeInfo> columnTypes) { + Object[] fields = new Object[columnTypes.size()]; int pos=0; for (TypeInfo type : columnTypes) { - switch (type.getCategory()) { - case PRIMITIVE: - fields[pos++] = getPrimitiveWritable((PrimitiveTypeInfo)type); - break; - case LIST: { - List<TypeInfo> elementType = new ArrayList<TypeInfo>(); - elementType.add(((ListTypeInfo) type).getListElementTypeInfo()); - fields[pos++] = record(createRecord(elementType)); - } break; - case MAP: { - List<TypeInfo> keyValueType = new ArrayList<TypeInfo>(); - keyValueType.add(((MapTypeInfo) type).getMapKeyTypeInfo()); - keyValueType.add(((MapTypeInfo) type).getMapValueTypeInfo()); - fields[pos++] = record(record(createRecord(keyValueType))); - } break; - case STRUCT: { - List<TypeInfo> elementType = ((StructTypeInfo) type).getAllStructFieldTypeInfos(); - fields[pos++] = createRecord(elementType); - } break; - default: - throw new IllegalStateException("Invalid column type: " + type); - } + fields[pos++] = crreateObjectFromType(type); } - return record(fields); + return new ObjectArrayWritable(fields); } private ObjectInspector getArrayWritableObjectInspector(final String columnTypes) { @@ -234,11 +243,11 @@ public class ColumnarStorageBench { List<String> columnNameList = Arrays.asList(getColumnNames(columnTypes).split(",")); StructTypeInfo rowTypeInfo = (StructTypeInfo)TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - return new ArrayWritableObjectInspector(rowTypeInfo); + return new ObjectArrayWritableObjectInspector(rowTypeInfo); } private Object createRandomRow(final String columnTypes) throws SerDeException { - Writable recordWritable = createRecord(TypeInfoUtils.getTypeInfosFromTypeString(columnTypes)); + Writable recordWritable = createStructObject(TypeInfoUtils.getTypeInfosFromTypeString(columnTypes)); Writable simpleWritable = lazySimpleSerDe.serialize(recordWritable, getArrayWritableObjectInspector(columnTypes)); return lazySimpleSerDe.deserialize(simpleWritable); } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java index befe2fc..6f2f1af 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -48,7 +48,6 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.DateUtils; /** @@ -248,10 +247,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof BooleanWritable) { BooleanWritable bw = (BooleanWritable) val; assignLong(bw.get() ? 1:0, destIndex); + } else { + assignLong((boolean)val ? 1:0, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -262,10 +262,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof ByteWritable) { ByteWritable bw = (ByteWritable) val; assignLong(bw.get(), destIndex); + } else { + assignLong((byte)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -276,10 +277,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof ShortWritable) { ShortWritable bw = (ShortWritable) val; assignLong(bw.get(), destIndex); + } else { + assignLong((short)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -290,10 +292,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof IntWritable) { IntWritable bw = (IntWritable) val; assignLong(bw.get(), destIndex); + } else { + assignLong((int)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -304,10 +307,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof LongWritable) { LongWritable bw = (LongWritable) val; assignLong(bw.get(), destIndex); + } else { + assignLong((long)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -383,10 +387,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof DoubleWritable) { DoubleWritable bw = (DoubleWritable) val; assignDouble(bw.get(), destIndex); + } else { + assignDouble((double)val, destIndex); } } }.init(outputBatch, (DoubleColumnVector) destCol); @@ -397,10 +402,11 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } - else { + } else if (val instanceof FloatWritable) { FloatWritable bw = (FloatWritable) val; assignDouble(bw.get(), destIndex); + } else { + assignDouble((float)val, destIndex); } } }.init(outputBatch, (DoubleColumnVector) destCol); @@ -543,45 +549,45 @@ public class VectorColumnAssignFactory { } public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch, - Writable[] writables) throws HiveException { + Object[] values) throws HiveException { VectorColumnAssign[] vcas = new VectorColumnAssign[outputBatch.numCols]; - for (int i = 0; i < writables.length; ++i) { - if (writables[i] == null) { + for (int i = 0; i < values.length; ++i) { + if (values[i] == null) { assert(outputBatch.cols[i] == null); vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VOID); - } else if (writables[i] instanceof ByteWritable) { + } else if (values[i] instanceof ByteWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BYTE); - } else if (writables[i] instanceof ShortWritable) { + } else if (values[i] instanceof ShortWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.SHORT); - } else if (writables[i] instanceof IntWritable) { + } else if (values[i] instanceof IntWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INT); - } else if (writables[i] instanceof LongWritable) { + } else if (values[i] instanceof LongWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.LONG); - } else if (writables[i] instanceof FloatWritable) { + } else if (values[i] instanceof FloatWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.FLOAT); - } else if (writables[i] instanceof DoubleWritable) { + } else if (values[i] instanceof DoubleWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DOUBLE); - } else if (writables[i] instanceof Text) { + } else if (values[i] instanceof Text) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.STRING); - } else if (writables[i] instanceof BytesWritable) { + } else if (values[i] instanceof BytesWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BINARY); - } else if (writables[i] instanceof TimestampWritable) { + } else if (values[i] instanceof TimestampWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.TIMESTAMP); - } else if (writables[i] instanceof HiveIntervalYearMonthWritable) { + } else if (values[i] instanceof HiveIntervalYearMonthWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_YEAR_MONTH); - } else if (writables[i] instanceof HiveIntervalDayTimeWritable) { + } else if (values[i] instanceof HiveIntervalDayTimeWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_DAY_TIME); - } else if (writables[i] instanceof BooleanWritable) { + } else if (values[i] instanceof BooleanWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BOOLEAN); - } else if (writables[i] instanceof HiveDecimalWritable) { + } else if (values[i] instanceof HiveDecimalWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DECIMAL); - } else if (writables[i] instanceof HiveCharWritable) { + } else if (values[i] instanceof HiveCharWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.CHAR); - } else if (writables[i] instanceof HiveVarcharWritable) { + } else if (values[i] instanceof HiveVarcharWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VARCHAR); } else { throw new HiveException("Unimplemented vector assigner for writable type " + - writables[i].getClass()); + values[i].getClass()); } } return vcas; http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index f4f0f07..f7adb39 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -20,7 +20,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.RecordReader; @@ -35,26 +35,26 @@ import parquet.hadoop.ParquetInputFormat; * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable> { +public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ObjectArrayWritable> { private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); - private final ParquetInputFormat<ArrayWritable> realInput; + private final ParquetInputFormat<ObjectArrayWritable> realInput; private final transient VectorizedParquetInputFormat vectorizedSelf; public MapredParquetInputFormat() { - this(new ParquetInputFormat<ArrayWritable>(DataWritableReadSupport.class)); + this(new ParquetInputFormat<ObjectArrayWritable>(DataWritableReadSupport.class)); } - protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) { + protected MapredParquetInputFormat(final ParquetInputFormat<ObjectArrayWritable> inputFormat) { this.realInput = inputFormat; vectorizedSelf = new VectorizedParquetInputFormat(inputFormat); } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordReader( + public org.apache.hadoop.mapred.RecordReader<NullWritable, ObjectArrayWritable> getRecordReader( final org.apache.hadoop.mapred.InputSplit split, final org.apache.hadoop.mapred.JobConf job, final org.apache.hadoop.mapred.Reporter reporter @@ -70,7 +70,7 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Arra if (LOG.isDebugEnabled()) { LOG.debug("Using row-mode record reader"); } - return (RecordReader<NullWritable, ArrayWritable>) + return (RecordReader<NullWritable, ObjectArrayWritable>) new ParquetRecordReaderWrapper(realInput, split, job, reporter); } } catch (final InterruptedException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index 843e079..c557963 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -23,9 +23,8 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -52,12 +51,12 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, private final ParquetRecordReaderWrapper internalReader; private VectorizedRowBatchCtx rbCtx; - private ArrayWritable internalValues; + private ObjectArrayWritable internalValues; private NullWritable internalKey; private VectorColumnAssign[] assigners; public VectorizedParquetRecordReader( - ParquetInputFormat<ArrayWritable> realInput, + ParquetInputFormat<ObjectArrayWritable> realInput, FileSplit split, JobConf conf, Reporter reporter) throws IOException, InterruptedException { internalReader = new ParquetRecordReaderWrapper( @@ -120,17 +119,17 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, outputBatch.endOfFile = true; break; } - Writable[] writables = internalValues.get(); + Object[] values = internalValues.get(); if (null == assigners) { // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, writables); + assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, values); } - for(int i = 0; i < writables.length; ++i) { - assigners[i].assignObjectValue(writables[i], outputBatch.size); + for(int i = 0; i < values.length; ++i) { + assigners[i].assignObjectValue(values[i], outputBatch.size); } ++outputBatch.size; } @@ -141,9 +140,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, } } - private final ParquetInputFormat<ArrayWritable> realInput; + private final ParquetInputFormat<ObjectArrayWritable> realInput; - public VectorizedParquetInputFormat(ParquetInputFormat<ArrayWritable> realInput) { + public VectorizedParquetInputFormat(ParquetInputFormat<ObjectArrayWritable> realInput) { this.realInput = realInput; } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java index 6ff6b47..3a6cb9e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java @@ -13,12 +13,10 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; -import org.apache.hadoop.io.Writable; - import java.util.Map; interface ConverterParent { - void set(int index, Writable value); + void set(int index, Object value); Map<String, String> getMetadata(); } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java index e9d1131..8ad38f2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java @@ -14,7 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.GroupType; @@ -27,7 +27,7 @@ import java.util.Map; * A MapWritableReadSupport, encapsulates the tuples * */ -public class DataWritableRecordConverter extends RecordMaterializer<ArrayWritable> { +public class DataWritableRecordConverter extends RecordMaterializer<ObjectArrayWritable> { private final HiveStructConverter root; @@ -37,7 +37,7 @@ public class DataWritableRecordConverter extends RecordMaterializer<ArrayWritabl } @Override - public ArrayWritable getCurrentRecord() { + public ObjectArrayWritable getCurrentRecord() { return root.getCurrentArray(); } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 3fc0129..04ded03 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -48,12 +48,11 @@ public enum ETypeConverter { EDOUBLE_CONVERTER(Double.TYPE) { @Override - PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addDouble(final double value) { - parent.set(index, new DoubleWritable(value)); + parent.set(index, value); } }; } @@ -64,7 +63,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addBoolean(final boolean value) { - parent.set(index, new BooleanWritable(value)); + parent.set(index, value); } }; } @@ -75,7 +74,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addFloat(final float value) { - parent.set(index, new FloatWritable(value)); + parent.set(index, value); } }; } @@ -86,7 +85,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addInt(final int value) { - parent.set(index, new IntWritable(value)); + parent.set(index, value); } }; } @@ -97,7 +96,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addLong(final long value) { - parent.set(index, new LongWritable(value)); + parent.set(index, value); } }; } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java index 0fd538e..cd3fd6e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java @@ -20,11 +20,11 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import com.google.common.base.Preconditions; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import parquet.io.api.Converter; import parquet.schema.GroupType; import parquet.schema.Type; @@ -34,7 +34,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { private final ConverterParent parent; private final int index; private final Converter innerConverter; - private final List<Writable> list = new ArrayList<Writable>(); + private final List<Object> list = new ArrayList<Object>(); public static HiveGroupConverter forMap(GroupType mapType, ConverterParent parent, @@ -83,12 +83,11 @@ public class HiveCollectionConverter extends HiveGroupConverter { @Override public void end() { - parent.set(index, wrapList(new ArrayWritable( - Writable.class, list.toArray(new Writable[list.size()])))); + parent.set(index, new ObjectArrayWritable(list.toArray())); } @Override - public void set(int index, Writable value) { + public void set(int index, Object value) { list.add(value); } @@ -96,7 +95,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { private final HiveGroupConverter parent; private final Converter keyConverter; private final Converter valueConverter; - private Writable[] keyValue = null; + private Object[] keyValue = new Object[2]; public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent) { setMetadata(parent.getMetadata()); @@ -108,7 +107,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { } @Override - public void set(int fieldIndex, Writable value) { + public void set(int fieldIndex, Object value) { keyValue[fieldIndex] = value; } @@ -127,19 +126,19 @@ public class HiveCollectionConverter extends HiveGroupConverter { @Override public void start() { - this.keyValue = new Writable[2]; + Arrays.fill(keyValue, null); } @Override public void end() { - parent.set(0, new ArrayWritable(Writable.class, keyValue)); + parent.set(0, new ObjectArrayWritable(keyValue)); } } private static class ElementConverter extends HiveGroupConverter { private final HiveGroupConverter parent; private final Converter elementConverter; - private Writable element = null; + private Object element = null; public ElementConverter(GroupType repeatedType, HiveGroupConverter parent) { setMetadata(parent.getMetadata()); @@ -149,7 +148,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { } @Override - public void set(int index, Writable value) { + public void set(int index, Object value) { this.element = value; } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java index 4809f9b..d516f05 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java @@ -81,6 +81,6 @@ public abstract class HiveGroupConverter extends GroupConverter implements Conve return new ArrayWritable(Writable.class, new Writable[] {list}); } - public abstract void set(int index, Writable value); + public abstract void set(int index, Object value); } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index f95d15e..4d06c36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -14,11 +14,11 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import parquet.io.api.Converter; import parquet.schema.GroupType; import parquet.schema.Type; @@ -34,7 +34,7 @@ public class HiveStructConverter extends HiveGroupConverter { private final Converter[] converters; private final ConverterParent parent; private final int index; - private Writable[] writables; + private Object[] elements; private final List<Repeated> repeatedConverters; private boolean reuseWritableArray = false; @@ -42,7 +42,7 @@ public class HiveStructConverter extends HiveGroupConverter { this(requestedSchema, null, 0, tableSchema); setMetadata(metadata); this.reuseWritableArray = true; - this.writables = new Writable[tableSchema.getFieldCount()]; + this.elements = new Object[tableSchema.getFieldCount()]; } public HiveStructConverter(final GroupType groupType, final ConverterParent parent, @@ -95,13 +95,13 @@ public class HiveStructConverter extends HiveGroupConverter { return converter; } - public final ArrayWritable getCurrentArray() { - return new ArrayWritable(Writable.class, writables); + public final ObjectArrayWritable getCurrentArray() { + return new ObjectArrayWritable(elements); } @Override - public void set(int fieldIndex, Writable value) { - writables[fieldIndex] = value; + public void set(int fieldIndex, Object value) { + elements[fieldIndex] = value; } @Override @@ -113,11 +113,9 @@ public class HiveStructConverter extends HiveGroupConverter { public void start() { if (reuseWritableArray) { // reset the array to null values - for (int i = 0; i < writables.length; i += 1) { - writables[i] = null; - } + Arrays.fill(elements, null); } else { - this.writables = new Writable[totalFieldCount]; + this.elements = new Object[totalFieldCount]; } for (Repeated repeated : repeatedConverters) { repeated.parentStart(); http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java index 0130aef..c84caec 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java @@ -19,12 +19,10 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import java.util.ArrayList; -import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import parquet.column.Dictionary; import parquet.io.api.Binary; import parquet.io.api.Converter; @@ -63,7 +61,7 @@ public interface Repeated extends ConverterParent { private final PrimitiveConverter wrapped; private final ConverterParent parent; private final int index; - private final List<Writable> list = new ArrayList<Writable>(); + private final List<Object> list = new ArrayList<Object>(); public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index) { setMetadata(parent.getMetadata()); @@ -125,12 +123,11 @@ public interface Repeated extends ConverterParent { @Override public void parentEnd() { - parent.set(index, HiveGroupConverter.wrapList(new ArrayWritable( - Writable.class, list.toArray(new Writable[list.size()])))); + parent.set(index, new ObjectArrayWritable(list.toArray())); } @Override - public void set(int index, Writable value) { + public void set(int index, Object value) { list.add(value); } } @@ -141,24 +138,21 @@ public interface Repeated extends ConverterParent { */ class RepeatedGroupConverter extends HiveGroupConverter implements Repeated { - private final GroupType groupType; private final HiveGroupConverter wrapped; private final ConverterParent parent; private final int index; - private final List<Writable> list = new ArrayList<Writable>(); - private final Map<String, String> metadata = new HashMap<String, String>(); + private final List<Object> list = new ArrayList<Object>(); public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index) { setMetadata(parent.getMetadata()); - this.groupType = groupType; this.parent = parent; this.index = index; this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this); } @Override - public void set(int fieldIndex, Writable value) { + public void set(int fieldIndex, Object value) { list.add(value); } @@ -185,8 +179,7 @@ public interface Repeated extends ConverterParent { @Override public void parentEnd() { - parent.set(index, wrapList(new ArrayWritable( - Writable.class, list.toArray(new Writable[list.size()])))); + parent.set(index, new ObjectArrayWritable(list.toArray())); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index dcd46bd..71e9550 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.util.StringUtils; import parquet.hadoop.api.InitContext; @@ -48,7 +48,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName; * Manages the translation between Hive and Parquet * */ -public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { +public class DataWritableReadSupport extends ReadSupport<ObjectArrayWritable> { public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA"; public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; @@ -245,7 +245,7 @@ public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { * @return Record Materialize for Hive */ @Override - public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration configuration, + public RecordMaterializer<ObjectArrayWritable> prepareForRead(final Configuration configuration, final Map<String, String> keyValueMetaData, final MessageType fileSchema, final parquet.hadoop.api.ReadSupport.ReadContext readContext) { final Map<String, String> metadata = readContext.getReadSupportMetadata(); http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 5c36564..6341c08 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -33,9 +33,8 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.io.Writable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -60,15 +59,15 @@ import parquet.schema.MessageTypeParser; import com.google.common.base.Strings; -public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ArrayWritable> { +public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ObjectArrayWritable> { public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class); private final long splitLen; // for getPos() - private org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> realReader; + private org.apache.hadoop.mapreduce.RecordReader<Void, ObjectArrayWritable> realReader; // expect readReader return same Key & Value objects (common case) // this avoids extra serialization & deserialization of these objects - private ArrayWritable valueObj = null; + private ObjectArrayWritable valueObj = null; private boolean firstRecord = false; private boolean eof = false; private int schemaSize; @@ -78,7 +77,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A private List<BlockMetaData> filtedBlocks; public ParquetRecordReaderWrapper( - final ParquetInputFormat<ArrayWritable> newInputFormat, + final ParquetInputFormat<ObjectArrayWritable> newInputFormat, final InputSplit oldSplit, final JobConf oldJobConf, final Reporter reporter) @@ -87,7 +86,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A } public ParquetRecordReaderWrapper( - final ParquetInputFormat<ArrayWritable> newInputFormat, + final ParquetInputFormat<ObjectArrayWritable> newInputFormat, final InputSplit oldSplit, final JobConf oldJobConf, final Reporter reporter, @@ -134,7 +133,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A eof = true; } if (valueObj == null) { // Should initialize the value for createValue - valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]); + valueObj = new ObjectArrayWritable(new Object[schemaSize]); } } @@ -174,7 +173,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A } @Override - public ArrayWritable createValue() { + public ObjectArrayWritable createValue() { return valueObj; } @@ -197,7 +196,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A } @Override - public boolean next(final NullWritable key, final ArrayWritable value) throws IOException { + public boolean next(final NullWritable key, final ObjectArrayWritable value) throws IOException { if (eof) { return false; } @@ -209,10 +208,10 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, A return false; } - final ArrayWritable tmpCurValue = realReader.getCurrentValue(); + final ObjectArrayWritable tmpCurValue = realReader.getCurrentValue(); if (value != tmpCurValue) { - final Writable[] arrValue = value.get(); - final Writable[] arrCurrent = tmpCurValue.get(); + final Object[] arrValue = value.get(); + final Object[] arrCurrent = tmpCurValue.get(); if (value != null && arrValue.length == arrCurrent.length) { System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java index 62c61fc..8f68fda 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java @@ -17,10 +17,9 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; public abstract class AbstractParquetMapInspector implements SettableMapObjectInspector { @@ -58,19 +57,19 @@ public abstract class AbstractParquetMapInspector implements SettableMapObjectIn return null; } - if (data instanceof ArrayWritable) { - final Writable[] mapContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] mapContainer = ((ObjectArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); - final Map<Writable, Writable> map = new LinkedHashMap<Writable, Writable>(); + final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); + final Map<Object, Object> map = new LinkedHashMap<Object, Object>(); - for (final Writable obj : mapArray) { - final ArrayWritable mapObj = (ArrayWritable) obj; - final Writable[] arr = mapObj.get(); + for (final Object obj : mapArray) { + final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; + final Object[] arr = mapObj.get(); map.put(arr[0], arr[1]); } @@ -90,13 +89,13 @@ public abstract class AbstractParquetMapInspector implements SettableMapObjectIn return -1; } - if (data instanceof ArrayWritable) { - final Writable[] mapContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] mapContainer = ((ObjectArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return -1; } else { - return ((ArrayWritable) mapContainer[0]).get().length; + return ((ObjectArrayWritable) mapContainer[0]).get().length; } } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java deleted file mode 100644 index 6091882..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java +++ /dev/null @@ -1,249 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.parquet.serde; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; -import org.apache.hadoop.io.ArrayWritable; - -/** - * - * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive struct.<br /> - * It can also inspect a List if Hive decides to inspect the result of an inspection. - * - */ -public class ArrayWritableObjectInspector extends SettableStructObjectInspector { - - private final TypeInfo typeInfo; - private final List<TypeInfo> fieldInfos; - private final List<String> fieldNames; - private final List<StructField> fields; - private final HashMap<String, StructFieldImpl> fieldsByName; - - public ArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { - - typeInfo = rowTypeInfo; - fieldNames = rowTypeInfo.getAllStructFieldNames(); - fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); - fields = new ArrayList<StructField>(fieldNames.size()); - fieldsByName = new HashMap<String, StructFieldImpl>(); - - for (int i = 0; i < fieldNames.size(); ++i) { - final String name = fieldNames.get(i); - final TypeInfo fieldInfo = fieldInfos.get(i); - - final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); - fields.add(field); - fieldsByName.put(name, field); - } - } - - private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { - if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableIntObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableLongObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetStringInspector; - } else if (typeInfo instanceof DecimalTypeInfo) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((DecimalTypeInfo) typeInfo); - } else if (typeInfo.getCategory().equals(Category.STRUCT)) { - return new ArrayWritableObjectInspector((StructTypeInfo) typeInfo); - } else if (typeInfo.getCategory().equals(Category.LIST)) { - final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); - return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); - } else if (typeInfo.getCategory().equals(Category.MAP)) { - final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); - final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); - if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.byteTypeInfo) - || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { - return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); - } else { - return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); - } - } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetByteInspector; - } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetShortInspector; - } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)){ - return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; - }else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableDateObjectInspector; - } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) typeInfo); - } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((VarcharTypeInfo) typeInfo); - } else { - throw new UnsupportedOperationException("Unknown field type: " + typeInfo); - } - - } - - @Override - public Category getCategory() { - return Category.STRUCT; - } - - @Override - public String getTypeName() { - return typeInfo.getTypeName(); - } - - @Override - public List<? extends StructField> getAllStructFieldRefs() { - return fields; - } - - @Override - public Object getStructFieldData(final Object data, final StructField fieldRef) { - if (data == null) { - return null; - } - - if (data instanceof ArrayWritable) { - final ArrayWritable arr = (ArrayWritable) data; - return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; - } - - //since setStructFieldData and create return a list, getStructFieldData should be able to - //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde - //is something else. - if (data instanceof List) { - return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); - } - - throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); - } - - @Override - public StructField getStructFieldRef(final String name) { - return fieldsByName.get(name); - } - - @Override - public List<Object> getStructFieldsDataAsList(final Object data) { - if (data == null) { - return null; - } - - if (data instanceof ArrayWritable) { - final ArrayWritable arr = (ArrayWritable) data; - final Object[] arrWritable = arr.get(); - return new ArrayList<Object>(Arrays.asList(arrWritable)); - } - - throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); - } - - @Override - public Object create() { - final ArrayList<Object> list = new ArrayList<Object>(fields.size()); - for (int i = 0; i < fields.size(); ++i) { - list.add(null); - } - return list; - } - - @Override - public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { - final ArrayList<Object> list = (ArrayList<Object>) struct; - list.set(((StructFieldImpl) field).getIndex(), fieldValue); - return list; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final ArrayWritableObjectInspector other = (ArrayWritableObjectInspector) obj; - if (this.typeInfo != other.typeInfo && (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int hash = 5; - hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); - return hash; - } - - class StructFieldImpl implements StructField { - - private final String name; - private final ObjectInspector inspector; - private final int index; - - public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { - this.name = name; - this.inspector = inspector; - this.index = index; - } - - @Override - public String getFieldComment() { - return ""; - } - - @Override - public String getFieldName() { - return name; - } - - public int getIndex() { - return index; - } - - @Override - public ObjectInspector getFieldObjectInspector() { - return inspector; - } - - @Override - public int getFieldID() { - return index; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java index d38c641..80fdb22 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java @@ -15,10 +15,9 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.Map; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; /** * The DeepParquetHiveMapInspector will inspect an ArrayWritable, considering it as a Hive map.<br /> @@ -39,18 +38,18 @@ public class DeepParquetHiveMapInspector extends AbstractParquetMapInspector { return null; } - if (data instanceof ArrayWritable) { - final Writable[] mapContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] mapContainer = ((ObjectArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); + final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); - for (final Writable obj : mapArray) { - final ArrayWritable mapObj = (ArrayWritable) obj; - final Writable[] arr = mapObj.get(); + for (final Object obj : mapArray) { + final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; + final Object[] arr = mapObj.get(); if (key.equals(arr[0]) || key.equals(((PrimitiveObjectInspector) keyInspector).getPrimitiveJavaObject(arr[0])) || key.equals(((PrimitiveObjectInspector) keyInspector).getPrimitiveWritableObject(arr[0]))) { return arr[1]; http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java new file mode 100644 index 0000000..571f993 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java @@ -0,0 +1,248 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; + +/** + * + * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive struct.<br /> + * It can also inspect a List if Hive decides to inspect the result of an inspection. + * + */ +public class ObjectArrayWritableObjectInspector extends SettableStructObjectInspector { + + private final TypeInfo typeInfo; + private final List<TypeInfo> fieldInfos; + private final List<String> fieldNames; + private final List<StructField> fields; + private final HashMap<String, StructFieldImpl> fieldsByName; + + public ObjectArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { + + typeInfo = rowTypeInfo; + fieldNames = rowTypeInfo.getAllStructFieldNames(); + fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<StructField>(fieldNames.size()); + fieldsByName = new HashMap<String, StructFieldImpl>(); + + for (int i = 0; i < fieldNames.size(); ++i) { + final String name = fieldNames.get(i); + final TypeInfo fieldInfo = fieldInfos.get(i); + + final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); + fields.add(field); + fieldsByName.put(name, field); + } + } + + private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaIntObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return PrimitiveObjectInspectorFactory.javaLongObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetStringInspector; + } else if (typeInfo instanceof DecimalTypeInfo) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((DecimalTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.STRUCT)) { + return new ObjectArrayWritableObjectInspector((StructTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.LIST)) { + final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); + } else if (typeInfo.getCategory().equals(Category.MAP)) { + final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.byteTypeInfo) + || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); + } else { + return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); + } + } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetByteInspector; + } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetShortInspector; + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)){ + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + }else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) typeInfo); + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((VarcharTypeInfo) typeInfo); + } else { + throw new UnsupportedOperationException("Unknown field type: " + typeInfo); + } + + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public String getTypeName() { + return typeInfo.getTypeName(); + } + + @Override + public List<? extends StructField> getAllStructFieldRefs() { + return fields; + } + + @Override + public Object getStructFieldData(final Object data, final StructField fieldRef) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; + } + + //since setStructFieldData and create return a list, getStructFieldData should be able to + //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde + //is something else. + if (data instanceof List) { + return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public StructField getStructFieldRef(final String name) { + return fieldsByName.get(name); + } + + @Override + public List<Object> getStructFieldsDataAsList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ObjectArrayWritable) { + final ObjectArrayWritable arr = (ObjectArrayWritable) data; + final Object[] arrObjects = arr.get(); + return Arrays.asList(arrObjects); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + final ArrayList<Object> list = new ArrayList<Object>(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + list.add(null); + } + return list; + } + + @Override + public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { + final ArrayList<Object> list = (ArrayList<Object>) struct; + list.set(((StructFieldImpl) field).getIndex(), fieldValue); + return list; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ObjectArrayWritableObjectInspector other = (ObjectArrayWritableObjectInspector) obj; + if (this.typeInfo != other.typeInfo && (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 5; + hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); + return hash; + } + + class StructFieldImpl implements StructField { + + private final String name; + private final ObjectInspector inspector; + private final int index; + + public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { + this.name = name; + this.inspector = inspector; + this.index = index; + } + + @Override + public String getFieldComment() { + return ""; + } + + @Override + public String getFieldName() { + return name; + } + + public int getIndex() { + return index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return index; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java index 53ca31d..e276359 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java @@ -16,10 +16,11 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.ArrayList; import java.util.List; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; + +import java.util.Arrays; /** * The ParquetHiveArrayInspector will inspect an ArrayWritable, considering it as an Hive array.<br /> @@ -55,21 +56,21 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return null; } - if (data instanceof ArrayWritable) { - final Writable[] listContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] listContainer = ((ObjectArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return null; } - final Writable subObj = listContainer[0]; + final Object subObj = listContainer[0]; if (subObj == null) { return null; } - if (index >= 0 && index < ((ArrayWritable) subObj).get().length) { - return ((ArrayWritable) subObj).get()[index]; + if (index >= 0 && index < ((ObjectArrayWritable) subObj).get().length) { + return ((ObjectArrayWritable) subObj).get()[index]; } else { return null; } @@ -84,20 +85,20 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return -1; } - if (data instanceof ArrayWritable) { - final Writable[] listContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] listContainer = ((ObjectArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return -1; } - final Writable subObj = listContainer[0]; + final Object subObj = listContainer[0]; if (subObj == null) { return 0; } - return ((ArrayWritable) subObj).get().length; + return ((ObjectArrayWritable) subObj).get().length; } throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); @@ -109,23 +110,23 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return null; } - if (data instanceof ArrayWritable) { - final Writable[] listContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] listContainer = ((ObjectArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return null; } - final Writable subObj = listContainer[0]; + final Object subObj = listContainer[0]; if (subObj == null) { return null; } - final Writable[] array = ((ArrayWritable) subObj).get(); - final List<Writable> list = new ArrayList<Writable>(); + final Object[] array = ((ObjectArrayWritable) subObj).get(); + final List<Object> list = new ArrayList<Object>(); - for (final Writable obj : array) { + for (final Object obj : array) { list.add(obj); } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index 7fd5e96..dc4f896 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -24,6 +24,7 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -33,7 +34,6 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; @@ -118,7 +118,7 @@ public class ParquetHiveSerDe extends AbstractSerDe { } // Create row related objects rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + this.objInspector = new ObjectArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); // Stats part serializedSize = 0; @@ -130,8 +130,8 @@ public class ParquetHiveSerDe extends AbstractSerDe { public Object deserialize(final Writable blob) throws SerDeException { status = LAST_OPERATION.DESERIALIZE; deserializedSize = 0; - if (blob instanceof ArrayWritable) { - deserializedSize = ((ArrayWritable) blob).get().length; + if (blob instanceof ObjectArrayWritable) { + deserializedSize = ((ObjectArrayWritable) blob).get().length; return blob; } else { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java index 5aa1448..0ee7e2c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java @@ -15,9 +15,8 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.Map; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.io.ArrayWritable; -import org.apache.hadoop.io.Writable; /** * The StandardParquetHiveMapInspector will inspect an ArrayWritable, considering it as a Hive map.<br /> @@ -36,16 +35,16 @@ public class StandardParquetHiveMapInspector extends AbstractParquetMapInspector if (data == null || key == null) { return null; } - if (data instanceof ArrayWritable) { - final Writable[] mapContainer = ((ArrayWritable) data).get(); + if (data instanceof ObjectArrayWritable) { + final Object[] mapContainer = ((ObjectArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); - for (final Writable obj : mapArray) { - final ArrayWritable mapObj = (ArrayWritable) obj; - final Writable[] arr = mapObj.get(); + final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); + for (final Object obj : mapArray) { + final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; + final Object[] arr = mapObj.get(); if (key.equals(arr[0])) { return arr[1]; } http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java index 864f562..b31f85c 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java @@ -52,9 +52,10 @@ public class ParquetByteInspector extends AbstractPrimitiveJavaObjectInspector i @Override public byte get(Object o) { - // Accept int writables and convert them. if (o instanceof IntWritable) { return (byte) ((IntWritable) o).get(); + } else if (o instanceof Integer) { + return ((Integer) o).byteValue(); } return ((ByteWritable) o).get(); http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java index 39f2657..0acf350 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java @@ -55,6 +55,8 @@ public class ParquetShortInspector extends AbstractPrimitiveJavaObjectInspector // Accept int writables and convert them. if (o instanceof IntWritable) { return (short) ((IntWritable) o).get(); + } else if (o instanceof Integer) { + return ((Integer) o).shortValue(); } return ((ShortWritable) o).get(); http://git-wip-us.apache.org/repos/asf/hive/blob/4157374d/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java index 94a780d..9c4cf5c 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; @@ -109,45 +109,45 @@ public abstract class AbstractTestParquetDirect { return path; } - public static ArrayWritable record(Writable... fields) { - return new ArrayWritable(Writable.class, fields); + public static ObjectArrayWritable record(Object... fields) { + return new ObjectArrayWritable(fields); } - public static ArrayWritable list(Writable... elements) { + public static ObjectArrayWritable list(Object... elements) { // the ObjectInspector for array<?> and map<?, ?> expects an extra layer - return new ArrayWritable(ArrayWritable.class, new ArrayWritable[] { - new ArrayWritable(Writable.class, elements) + return new ObjectArrayWritable(new Object[] { + new ObjectArrayWritable(elements) }); } - public static String toString(ArrayWritable arrayWritable) { - Writable[] writables = arrayWritable.get(); - String[] strings = new String[writables.length]; - for (int i = 0; i < writables.length; i += 1) { - if (writables[i] instanceof ArrayWritable) { - strings[i] = toString((ArrayWritable) writables[i]); + public static String toString(ObjectArrayWritable arrayWritable) { + Object[] elements = arrayWritable.get(); + String[] strings = new String[elements.length]; + for (int i = 0; i < elements.length; i += 1) { + if (elements[i] instanceof ObjectArrayWritable) { + strings[i] = toString((ObjectArrayWritable) elements[i]); } else { - strings[i] = String.valueOf(writables[i]); + strings[i] = String.valueOf(elements[i]); } } return Arrays.toString(strings); } - public static void assertEquals(String message, ArrayWritable expected, - ArrayWritable actual) { + public static void assertEquals(String message, ObjectArrayWritable expected, + ObjectArrayWritable actual) { Assert.assertEquals(message, toString(expected), toString(actual)); } - public static List<ArrayWritable> read(Path parquetFile) throws IOException { - List<ArrayWritable> records = new ArrayList<ArrayWritable>(); + public static List<ObjectArrayWritable> read(Path parquetFile) throws IOException { + List<ObjectArrayWritable> records = new ArrayList<ObjectArrayWritable>(); - RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat(). + RecordReader<NullWritable, ObjectArrayWritable> reader = new MapredParquetInputFormat(). getRecordReader(new FileSplit( parquetFile, 0, fileLength(parquetFile), (String[]) null), new JobConf(), null); NullWritable alwaysNull = reader.createKey(); - ArrayWritable record = reader.createValue(); + ObjectArrayWritable record = reader.createValue(); while (reader.next(alwaysNull, record)) { records.add(record); record = reader.createValue(); // a new value so the last isn't clobbered