Revert "HIVE-9605:Remove parquet nested objects from wrapper writable objects (Sergio Pena, reviewed by Ferdinand Xu)"
This reverts commit 4157374d1538afa7a36d2484a9e4abd1016a2ef3. Patch committed does not belong to HIVE-9605 ticket. Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82c037af Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82c037af Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82c037af Branch: refs/heads/llap Commit: 82c037afefc2cfdaef8c17b285c6cde2e72ba6d6 Parents: f078dd4 Author: Sergio Pena <sergio.p...@cloudera.com> Authored: Wed May 27 10:10:23 2015 -0500 Committer: Sergio Pena <sergio.p...@cloudera.com> Committed: Wed May 27 10:10:23 2015 -0500 ---------------------------------------------------------------------- .../benchmark/storage/ColumnarStorageBench.java | 89 +++---- .../exec/vector/VectorColumnAssignFactory.java | 74 +++--- .../ql/io/parquet/MapredParquetInputFormat.java | 14 +- .../parquet/VectorizedParquetInputFormat.java | 19 +- .../ql/io/parquet/convert/ConverterParent.java | 4 +- .../convert/DataWritableRecordConverter.java | 6 +- .../ql/io/parquet/convert/ETypeConverter.java | 11 +- .../convert/HiveCollectionConverter.java | 23 +- .../io/parquet/convert/HiveGroupConverter.java | 2 +- .../io/parquet/convert/HiveStructConverter.java | 22 +- .../hive/ql/io/parquet/convert/Repeated.java | 21 +- .../parquet/read/DataWritableReadSupport.java | 6 +- .../read/ParquetRecordReaderWrapper.java | 25 +- .../serde/AbstractParquetMapInspector.java | 23 +- .../serde/ArrayWritableObjectInspector.java | 249 +++++++++++++++++++ .../serde/DeepParquetHiveMapInspector.java | 15 +- .../ObjectArrayWritableObjectInspector.java | 248 ------------------ .../serde/ParquetHiveArrayInspector.java | 35 ++- .../ql/io/parquet/serde/ParquetHiveSerDe.java | 8 +- .../serde/StandardParquetHiveMapInspector.java | 15 +- .../serde/primitive/ParquetByteInspector.java | 3 +- .../serde/primitive/ParquetShortInspector.java | 2 - .../io/parquet/AbstractTestParquetDirect.java | 38 +-- .../ql/io/parquet/TestArrayCompatibility.java | 88 +++---- .../ql/io/parquet/TestDataWritableWriter.java | 65 ++--- .../hive/ql/io/parquet/TestMapStructures.java | 30 +-- .../parquet/TestMapredParquetInputFormat.java | 4 +- .../io/parquet/TestParquetRowGroupFilter.java | 6 +- .../hive/serde2/io/ObjectArrayWritable.java | 51 ---- 29 files changed, 578 insertions(+), 618 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java ---------------------------------------------------------------------- diff --git a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java index eef7800..61c2eb4 100644 --- a/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java +++ b/itests/hive-jmh/src/main/java/org/apache/hive/benchmark/storage/ColumnarStorageBench.java @@ -24,12 +24,12 @@ import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcSerde; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat; import org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat; -import org.apache.hadoop.hive.ql.io.parquet.serde.ObjectArrayWritableObjectInspector; +import org.apache.hadoop.hive.ql.io.parquet.serde.ArrayWritableObjectInspector; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; @@ -40,6 +40,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.mapred.FileSplit; @@ -169,16 +172,20 @@ public class ColumnarStorageBench { return fs.getFileStatus(path).getLen(); } - private Object createPrimitiveObject(final PrimitiveTypeInfo typeInfo) { + private ArrayWritable record(Writable... fields) { + return new ArrayWritable(Writable.class, fields); + } + + private Writable getPrimitiveWritable(final PrimitiveTypeInfo typeInfo) { Random rand = new Random(); switch (typeInfo.getPrimitiveCategory()) { case INT: - return rand.nextInt(); + return new IntWritable(rand.nextInt()); case DOUBLE: - return rand.nextDouble(); + return new DoubleWritable(rand.nextDouble()); case BOOLEAN: - return rand.nextBoolean(); + return new BooleanWritable(rand.nextBoolean()); case CHAR: case VARCHAR: case STRING: @@ -190,52 +197,36 @@ public class ColumnarStorageBench { } } - private Object crreateObjectFromType(final TypeInfo typeInfo) { - switch (typeInfo.getCategory()) { - case PRIMITIVE: - return createPrimitiveObject((PrimitiveTypeInfo) typeInfo); - case LIST: - return createListObject((ListTypeInfo) typeInfo); - case MAP: - return createMapObject((MapTypeInfo) typeInfo); - case STRUCT: - return createStructObject(((StructTypeInfo) typeInfo).getAllStructFieldTypeInfos()); - default: - throw new IllegalStateException("Invalid column type: " + typeInfo); - } - } - - private ObjectArrayWritable createListObject(final ListTypeInfo typeInfo) { - List<Object> list = new ArrayList<Object>(); - list.add(crreateObjectFromType(typeInfo.getListElementTypeInfo())); - - return new ObjectArrayWritable(list.toArray()); - } - - private ObjectArrayWritable createMapObject(final MapTypeInfo typeInfo) { - Object[] keyValue = new Object[2]; - keyValue[0] = crreateObjectFromType(typeInfo.getMapKeyTypeInfo()); - keyValue[1] = crreateObjectFromType(typeInfo.getMapValueTypeInfo()); - - ObjectArrayWritable map = new ObjectArrayWritable(keyValue); - - List<ObjectArrayWritable> list = new ArrayList<ObjectArrayWritable>(); - list.add(map); - - return new ObjectArrayWritable(list.toArray()); - } - - - - private ObjectArrayWritable createStructObject(final List<TypeInfo> columnTypes) { - Object[] fields = new Object[columnTypes.size()]; + private ArrayWritable createRecord(final List<TypeInfo> columnTypes) { + Writable[] fields = new Writable[columnTypes.size()]; int pos=0; for (TypeInfo type : columnTypes) { - fields[pos++] = crreateObjectFromType(type); + switch (type.getCategory()) { + case PRIMITIVE: + fields[pos++] = getPrimitiveWritable((PrimitiveTypeInfo)type); + break; + case LIST: { + List<TypeInfo> elementType = new ArrayList<TypeInfo>(); + elementType.add(((ListTypeInfo) type).getListElementTypeInfo()); + fields[pos++] = record(createRecord(elementType)); + } break; + case MAP: { + List<TypeInfo> keyValueType = new ArrayList<TypeInfo>(); + keyValueType.add(((MapTypeInfo) type).getMapKeyTypeInfo()); + keyValueType.add(((MapTypeInfo) type).getMapValueTypeInfo()); + fields[pos++] = record(record(createRecord(keyValueType))); + } break; + case STRUCT: { + List<TypeInfo> elementType = ((StructTypeInfo) type).getAllStructFieldTypeInfos(); + fields[pos++] = createRecord(elementType); + } break; + default: + throw new IllegalStateException("Invalid column type: " + type); + } } - return new ObjectArrayWritable(fields); + return record(fields); } private ObjectInspector getArrayWritableObjectInspector(final String columnTypes) { @@ -243,11 +234,11 @@ public class ColumnarStorageBench { List<String> columnNameList = Arrays.asList(getColumnNames(columnTypes).split(",")); StructTypeInfo rowTypeInfo = (StructTypeInfo)TypeInfoFactory.getStructTypeInfo(columnNameList, columnTypeList); - return new ObjectArrayWritableObjectInspector(rowTypeInfo); + return new ArrayWritableObjectInspector(rowTypeInfo); } private Object createRandomRow(final String columnTypes) throws SerDeException { - Writable recordWritable = createStructObject(TypeInfoUtils.getTypeInfosFromTypeString(columnTypes)); + Writable recordWritable = createRecord(TypeInfoUtils.getTypeInfosFromTypeString(columnTypes)); Writable simpleWritable = lazySimpleSerDe.serialize(recordWritable, getArrayWritableObjectInspector(columnTypes)); return lazySimpleSerDe.deserialize(simpleWritable); } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java index 6f2f1af..befe2fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java @@ -48,6 +48,7 @@ import org.apache.hadoop.io.FloatWritable; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.Writable; import org.apache.hive.common.util.DateUtils; /** @@ -247,11 +248,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof BooleanWritable) { + } + else { BooleanWritable bw = (BooleanWritable) val; assignLong(bw.get() ? 1:0, destIndex); - } else { - assignLong((boolean)val ? 1:0, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -262,11 +262,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof ByteWritable) { + } + else { ByteWritable bw = (ByteWritable) val; assignLong(bw.get(), destIndex); - } else { - assignLong((byte)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -277,11 +276,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof ShortWritable) { + } + else { ShortWritable bw = (ShortWritable) val; assignLong(bw.get(), destIndex); - } else { - assignLong((short)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -292,11 +290,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof IntWritable) { + } + else { IntWritable bw = (IntWritable) val; assignLong(bw.get(), destIndex); - } else { - assignLong((int)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -307,11 +304,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof LongWritable) { + } + else { LongWritable bw = (LongWritable) val; assignLong(bw.get(), destIndex); - } else { - assignLong((long)val, destIndex); } } }.init(outputBatch, (LongColumnVector) destCol); @@ -387,11 +383,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof DoubleWritable) { + } + else { DoubleWritable bw = (DoubleWritable) val; assignDouble(bw.get(), destIndex); - } else { - assignDouble((double)val, destIndex); } } }.init(outputBatch, (DoubleColumnVector) destCol); @@ -402,11 +397,10 @@ public class VectorColumnAssignFactory { public void assignObjectValue(Object val, int destIndex) throws HiveException { if (val == null) { assignNull(destIndex); - } else if (val instanceof FloatWritable) { + } + else { FloatWritable bw = (FloatWritable) val; assignDouble(bw.get(), destIndex); - } else { - assignDouble((float)val, destIndex); } } }.init(outputBatch, (DoubleColumnVector) destCol); @@ -549,45 +543,45 @@ public class VectorColumnAssignFactory { } public static VectorColumnAssign[] buildAssigners(VectorizedRowBatch outputBatch, - Object[] values) throws HiveException { + Writable[] writables) throws HiveException { VectorColumnAssign[] vcas = new VectorColumnAssign[outputBatch.numCols]; - for (int i = 0; i < values.length; ++i) { - if (values[i] == null) { + for (int i = 0; i < writables.length; ++i) { + if (writables[i] == null) { assert(outputBatch.cols[i] == null); vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VOID); - } else if (values[i] instanceof ByteWritable) { + } else if (writables[i] instanceof ByteWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BYTE); - } else if (values[i] instanceof ShortWritable) { + } else if (writables[i] instanceof ShortWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.SHORT); - } else if (values[i] instanceof IntWritable) { + } else if (writables[i] instanceof IntWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INT); - } else if (values[i] instanceof LongWritable) { + } else if (writables[i] instanceof LongWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.LONG); - } else if (values[i] instanceof FloatWritable) { + } else if (writables[i] instanceof FloatWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.FLOAT); - } else if (values[i] instanceof DoubleWritable) { + } else if (writables[i] instanceof DoubleWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DOUBLE); - } else if (values[i] instanceof Text) { + } else if (writables[i] instanceof Text) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.STRING); - } else if (values[i] instanceof BytesWritable) { + } else if (writables[i] instanceof BytesWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BINARY); - } else if (values[i] instanceof TimestampWritable) { + } else if (writables[i] instanceof TimestampWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.TIMESTAMP); - } else if (values[i] instanceof HiveIntervalYearMonthWritable) { + } else if (writables[i] instanceof HiveIntervalYearMonthWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_YEAR_MONTH); - } else if (values[i] instanceof HiveIntervalDayTimeWritable) { + } else if (writables[i] instanceof HiveIntervalDayTimeWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_DAY_TIME); - } else if (values[i] instanceof BooleanWritable) { + } else if (writables[i] instanceof BooleanWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BOOLEAN); - } else if (values[i] instanceof HiveDecimalWritable) { + } else if (writables[i] instanceof HiveDecimalWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DECIMAL); - } else if (values[i] instanceof HiveCharWritable) { + } else if (writables[i] instanceof HiveCharWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.CHAR); - } else if (values[i] instanceof HiveVarcharWritable) { + } else if (writables[i] instanceof HiveVarcharWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VARCHAR); } else { throw new HiveException("Unimplemented vector assigner for writable type " + - values[i].getClass()); + writables[i].getClass()); } } return vcas; http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java index f7adb39..f4f0f07 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/MapredParquetInputFormat.java @@ -20,7 +20,7 @@ import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.exec.vector.VectorizedInputFormatInterface; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.RecordReader; @@ -35,26 +35,26 @@ import parquet.hadoop.ParquetInputFormat; * NOTE: With HIVE-9235 we removed "implements VectorizedParquetInputFormat" since all data types * are not currently supported. Removing the interface turns off vectorization. */ -public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ObjectArrayWritable> { +public class MapredParquetInputFormat extends FileInputFormat<NullWritable, ArrayWritable> { private static final Log LOG = LogFactory.getLog(MapredParquetInputFormat.class); - private final ParquetInputFormat<ObjectArrayWritable> realInput; + private final ParquetInputFormat<ArrayWritable> realInput; private final transient VectorizedParquetInputFormat vectorizedSelf; public MapredParquetInputFormat() { - this(new ParquetInputFormat<ObjectArrayWritable>(DataWritableReadSupport.class)); + this(new ParquetInputFormat<ArrayWritable>(DataWritableReadSupport.class)); } - protected MapredParquetInputFormat(final ParquetInputFormat<ObjectArrayWritable> inputFormat) { + protected MapredParquetInputFormat(final ParquetInputFormat<ArrayWritable> inputFormat) { this.realInput = inputFormat; vectorizedSelf = new VectorizedParquetInputFormat(inputFormat); } @SuppressWarnings({ "unchecked", "rawtypes" }) @Override - public org.apache.hadoop.mapred.RecordReader<NullWritable, ObjectArrayWritable> getRecordReader( + public org.apache.hadoop.mapred.RecordReader<NullWritable, ArrayWritable> getRecordReader( final org.apache.hadoop.mapred.InputSplit split, final org.apache.hadoop.mapred.JobConf job, final org.apache.hadoop.mapred.Reporter reporter @@ -70,7 +70,7 @@ public class MapredParquetInputFormat extends FileInputFormat<NullWritable, Obje if (LOG.isDebugEnabled()) { LOG.debug("Using row-mode record reader"); } - return (RecordReader<NullWritable, ObjectArrayWritable>) + return (RecordReader<NullWritable, ArrayWritable>) new ParquetRecordReaderWrapper(realInput, split, job, reporter); } } catch (final InterruptedException e) { http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java index c557963..843e079 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java @@ -23,8 +23,9 @@ import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.parquet.read.ParquetRecordReaderWrapper; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; @@ -51,12 +52,12 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, private final ParquetRecordReaderWrapper internalReader; private VectorizedRowBatchCtx rbCtx; - private ObjectArrayWritable internalValues; + private ArrayWritable internalValues; private NullWritable internalKey; private VectorColumnAssign[] assigners; public VectorizedParquetRecordReader( - ParquetInputFormat<ObjectArrayWritable> realInput, + ParquetInputFormat<ArrayWritable> realInput, FileSplit split, JobConf conf, Reporter reporter) throws IOException, InterruptedException { internalReader = new ParquetRecordReaderWrapper( @@ -119,17 +120,17 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, outputBatch.endOfFile = true; break; } - Object[] values = internalValues.get(); + Writable[] writables = internalValues.get(); if (null == assigners) { // Normally we'd build the assigners from the rbCtx.rowOI, but with Parquet // we have a discrepancy between the metadata type (Eg. tinyint -> BYTE) and // the writable value (IntWritable). see Parquet's ETypeConverter class. - assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, values); + assigners = VectorColumnAssignFactory.buildAssigners(outputBatch, writables); } - for(int i = 0; i < values.length; ++i) { - assigners[i].assignObjectValue(values[i], outputBatch.size); + for(int i = 0; i < writables.length; ++i) { + assigners[i].assignObjectValue(writables[i], outputBatch.size); } ++outputBatch.size; } @@ -140,9 +141,9 @@ public class VectorizedParquetInputFormat extends FileInputFormat<NullWritable, } } - private final ParquetInputFormat<ObjectArrayWritable> realInput; + private final ParquetInputFormat<ArrayWritable> realInput; - public VectorizedParquetInputFormat(ParquetInputFormat<ObjectArrayWritable> realInput) { + public VectorizedParquetInputFormat(ParquetInputFormat<ArrayWritable> realInput) { this.realInput = realInput; } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java index 3a6cb9e..6ff6b47 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ConverterParent.java @@ -13,10 +13,12 @@ */ package org.apache.hadoop.hive.ql.io.parquet.convert; +import org.apache.hadoop.io.Writable; + import java.util.Map; interface ConverterParent { - void set(int index, Object value); + void set(int index, Writable value); Map<String, String> getMetadata(); } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java index 8ad38f2..e9d1131 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/DataWritableRecordConverter.java @@ -14,7 +14,7 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import org.apache.hadoop.hive.ql.io.parquet.read.DataWritableReadSupport; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; import parquet.io.api.GroupConverter; import parquet.io.api.RecordMaterializer; import parquet.schema.GroupType; @@ -27,7 +27,7 @@ import java.util.Map; * A MapWritableReadSupport, encapsulates the tuples * */ -public class DataWritableRecordConverter extends RecordMaterializer<ObjectArrayWritable> { +public class DataWritableRecordConverter extends RecordMaterializer<ArrayWritable> { private final HiveStructConverter root; @@ -37,7 +37,7 @@ public class DataWritableRecordConverter extends RecordMaterializer<ObjectArrayW } @Override - public ObjectArrayWritable getCurrentRecord() { + public ArrayWritable getCurrentRecord() { return root.getCurrentArray(); } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java index 04ded03..3fc0129 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/ETypeConverter.java @@ -48,11 +48,12 @@ public enum ETypeConverter { EDOUBLE_CONVERTER(Double.TYPE) { @Override + PrimitiveConverter getConverter(final PrimitiveType type, final int index, final ConverterParent parent) { return new PrimitiveConverter() { @Override public void addDouble(final double value) { - parent.set(index, value); + parent.set(index, new DoubleWritable(value)); } }; } @@ -63,7 +64,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addBoolean(final boolean value) { - parent.set(index, value); + parent.set(index, new BooleanWritable(value)); } }; } @@ -74,7 +75,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addFloat(final float value) { - parent.set(index, value); + parent.set(index, new FloatWritable(value)); } }; } @@ -85,7 +86,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addInt(final int value) { - parent.set(index, value); + parent.set(index, new IntWritable(value)); } }; } @@ -96,7 +97,7 @@ public enum ETypeConverter { return new PrimitiveConverter() { @Override public void addLong(final long value) { - parent.set(index, value); + parent.set(index, new LongWritable(value)); } }; } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java index cd3fd6e..0fd538e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveCollectionConverter.java @@ -20,11 +20,11 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import com.google.common.base.Preconditions; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; import parquet.io.api.Converter; import parquet.schema.GroupType; import parquet.schema.Type; @@ -34,7 +34,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { private final ConverterParent parent; private final int index; private final Converter innerConverter; - private final List<Object> list = new ArrayList<Object>(); + private final List<Writable> list = new ArrayList<Writable>(); public static HiveGroupConverter forMap(GroupType mapType, ConverterParent parent, @@ -83,11 +83,12 @@ public class HiveCollectionConverter extends HiveGroupConverter { @Override public void end() { - parent.set(index, new ObjectArrayWritable(list.toArray())); + parent.set(index, wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); } @Override - public void set(int index, Object value) { + public void set(int index, Writable value) { list.add(value); } @@ -95,7 +96,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { private final HiveGroupConverter parent; private final Converter keyConverter; private final Converter valueConverter; - private Object[] keyValue = new Object[2]; + private Writable[] keyValue = null; public KeyValueConverter(GroupType keyValueType, HiveGroupConverter parent) { setMetadata(parent.getMetadata()); @@ -107,7 +108,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { } @Override - public void set(int fieldIndex, Object value) { + public void set(int fieldIndex, Writable value) { keyValue[fieldIndex] = value; } @@ -126,19 +127,19 @@ public class HiveCollectionConverter extends HiveGroupConverter { @Override public void start() { - Arrays.fill(keyValue, null); + this.keyValue = new Writable[2]; } @Override public void end() { - parent.set(0, new ObjectArrayWritable(keyValue)); + parent.set(0, new ArrayWritable(Writable.class, keyValue)); } } private static class ElementConverter extends HiveGroupConverter { private final HiveGroupConverter parent; private final Converter elementConverter; - private Object element = null; + private Writable element = null; public ElementConverter(GroupType repeatedType, HiveGroupConverter parent) { setMetadata(parent.getMetadata()); @@ -148,7 +149,7 @@ public class HiveCollectionConverter extends HiveGroupConverter { } @Override - public void set(int index, Object value) { + public void set(int index, Writable value) { this.element = value; } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java index d516f05..4809f9b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveGroupConverter.java @@ -81,6 +81,6 @@ public abstract class HiveGroupConverter extends GroupConverter implements Conve return new ArrayWritable(Writable.class, new Writable[] {list}); } - public abstract void set(int index, Object value); + public abstract void set(int index, Writable value); } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java index 4d06c36..f95d15e 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/HiveStructConverter.java @@ -14,11 +14,11 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import java.util.ArrayList; -import java.util.Arrays; import java.util.List; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; import parquet.io.api.Converter; import parquet.schema.GroupType; import parquet.schema.Type; @@ -34,7 +34,7 @@ public class HiveStructConverter extends HiveGroupConverter { private final Converter[] converters; private final ConverterParent parent; private final int index; - private Object[] elements; + private Writable[] writables; private final List<Repeated> repeatedConverters; private boolean reuseWritableArray = false; @@ -42,7 +42,7 @@ public class HiveStructConverter extends HiveGroupConverter { this(requestedSchema, null, 0, tableSchema); setMetadata(metadata); this.reuseWritableArray = true; - this.elements = new Object[tableSchema.getFieldCount()]; + this.writables = new Writable[tableSchema.getFieldCount()]; } public HiveStructConverter(final GroupType groupType, final ConverterParent parent, @@ -95,13 +95,13 @@ public class HiveStructConverter extends HiveGroupConverter { return converter; } - public final ObjectArrayWritable getCurrentArray() { - return new ObjectArrayWritable(elements); + public final ArrayWritable getCurrentArray() { + return new ArrayWritable(Writable.class, writables); } @Override - public void set(int fieldIndex, Object value) { - elements[fieldIndex] = value; + public void set(int fieldIndex, Writable value) { + writables[fieldIndex] = value; } @Override @@ -113,9 +113,11 @@ public class HiveStructConverter extends HiveGroupConverter { public void start() { if (reuseWritableArray) { // reset the array to null values - Arrays.fill(elements, null); + for (int i = 0; i < writables.length; i += 1) { + writables[i] = null; + } } else { - this.elements = new Object[totalFieldCount]; + this.writables = new Writable[totalFieldCount]; } for (Repeated repeated : repeatedConverters) { repeated.parentStart(); http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java index c84caec..0130aef 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/convert/Repeated.java @@ -19,10 +19,12 @@ package org.apache.hadoop.hive.ql.io.parquet.convert; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; import parquet.column.Dictionary; import parquet.io.api.Binary; import parquet.io.api.Converter; @@ -61,7 +63,7 @@ public interface Repeated extends ConverterParent { private final PrimitiveConverter wrapped; private final ConverterParent parent; private final int index; - private final List<Object> list = new ArrayList<Object>(); + private final List<Writable> list = new ArrayList<Writable>(); public RepeatedPrimitiveConverter(PrimitiveType primitiveType, ConverterParent parent, int index) { setMetadata(parent.getMetadata()); @@ -123,11 +125,12 @@ public interface Repeated extends ConverterParent { @Override public void parentEnd() { - parent.set(index, new ObjectArrayWritable(list.toArray())); + parent.set(index, HiveGroupConverter.wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); } @Override - public void set(int index, Object value) { + public void set(int index, Writable value) { list.add(value); } } @@ -138,21 +141,24 @@ public interface Repeated extends ConverterParent { */ class RepeatedGroupConverter extends HiveGroupConverter implements Repeated { + private final GroupType groupType; private final HiveGroupConverter wrapped; private final ConverterParent parent; private final int index; - private final List<Object> list = new ArrayList<Object>(); + private final List<Writable> list = new ArrayList<Writable>(); + private final Map<String, String> metadata = new HashMap<String, String>(); public RepeatedGroupConverter(GroupType groupType, ConverterParent parent, int index) { setMetadata(parent.getMetadata()); + this.groupType = groupType; this.parent = parent; this.index = index; this.wrapped = HiveGroupConverter.getConverterFromDescription(groupType, 0, this); } @Override - public void set(int fieldIndex, Object value) { + public void set(int fieldIndex, Writable value) { list.add(value); } @@ -179,7 +185,8 @@ public interface Repeated extends ConverterParent { @Override public void parentEnd() { - parent.set(index, new ObjectArrayWritable(list.toArray())); + parent.set(index, wrapList(new ArrayWritable( + Writable.class, list.toArray(new Writable[list.size()])))); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java index 71e9550..dcd46bd 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/DataWritableReadSupport.java @@ -29,7 +29,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.util.StringUtils; import parquet.hadoop.api.InitContext; @@ -48,7 +48,7 @@ import parquet.schema.PrimitiveType.PrimitiveTypeName; * Manages the translation between Hive and Parquet * */ -public class DataWritableReadSupport extends ReadSupport<ObjectArrayWritable> { +public class DataWritableReadSupport extends ReadSupport<ArrayWritable> { public static final String HIVE_TABLE_AS_PARQUET_SCHEMA = "HIVE_TABLE_SCHEMA"; public static final String PARQUET_COLUMN_INDEX_ACCESS = "parquet.column.index.access"; @@ -245,7 +245,7 @@ public class DataWritableReadSupport extends ReadSupport<ObjectArrayWritable> { * @return Record Materialize for Hive */ @Override - public RecordMaterializer<ObjectArrayWritable> prepareForRead(final Configuration configuration, + public RecordMaterializer<ArrayWritable> prepareForRead(final Configuration configuration, final Map<String, String> keyValueMetaData, final MessageType fileSchema, final parquet.hadoop.api.ReadSupport.ReadContext readContext) { final Map<String, String> metadata = readContext.getReadSupportMetadata(); http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java index 6341c08..5c36564 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java @@ -33,8 +33,9 @@ import org.apache.hadoop.hive.ql.io.parquet.ProjectionPusher; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; @@ -59,15 +60,15 @@ import parquet.schema.MessageTypeParser; import com.google.common.base.Strings; -public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ObjectArrayWritable> { +public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, ArrayWritable> { public static final Log LOG = LogFactory.getLog(ParquetRecordReaderWrapper.class); private final long splitLen; // for getPos() - private org.apache.hadoop.mapreduce.RecordReader<Void, ObjectArrayWritable> realReader; + private org.apache.hadoop.mapreduce.RecordReader<Void, ArrayWritable> realReader; // expect readReader return same Key & Value objects (common case) // this avoids extra serialization & deserialization of these objects - private ObjectArrayWritable valueObj = null; + private ArrayWritable valueObj = null; private boolean firstRecord = false; private boolean eof = false; private int schemaSize; @@ -77,7 +78,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O private List<BlockMetaData> filtedBlocks; public ParquetRecordReaderWrapper( - final ParquetInputFormat<ObjectArrayWritable> newInputFormat, + final ParquetInputFormat<ArrayWritable> newInputFormat, final InputSplit oldSplit, final JobConf oldJobConf, final Reporter reporter) @@ -86,7 +87,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O } public ParquetRecordReaderWrapper( - final ParquetInputFormat<ObjectArrayWritable> newInputFormat, + final ParquetInputFormat<ArrayWritable> newInputFormat, final InputSplit oldSplit, final JobConf oldJobConf, final Reporter reporter, @@ -133,7 +134,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O eof = true; } if (valueObj == null) { // Should initialize the value for createValue - valueObj = new ObjectArrayWritable(new Object[schemaSize]); + valueObj = new ArrayWritable(Writable.class, new Writable[schemaSize]); } } @@ -173,7 +174,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O } @Override - public ObjectArrayWritable createValue() { + public ArrayWritable createValue() { return valueObj; } @@ -196,7 +197,7 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O } @Override - public boolean next(final NullWritable key, final ObjectArrayWritable value) throws IOException { + public boolean next(final NullWritable key, final ArrayWritable value) throws IOException { if (eof) { return false; } @@ -208,10 +209,10 @@ public class ParquetRecordReaderWrapper implements RecordReader<NullWritable, O return false; } - final ObjectArrayWritable tmpCurValue = realReader.getCurrentValue(); + final ArrayWritable tmpCurValue = realReader.getCurrentValue(); if (value != tmpCurValue) { - final Object[] arrValue = value.get(); - final Object[] arrCurrent = tmpCurValue.get(); + final Writable[] arrValue = value.get(); + final Writable[] arrCurrent = tmpCurValue.get(); if (value != null && arrValue.length == arrCurrent.length) { System.arraycopy(arrCurrent, 0, arrValue, 0, arrCurrent.length); } else { http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java index 8f68fda..62c61fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/AbstractParquetMapInspector.java @@ -17,9 +17,10 @@ import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SettableMapObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; public abstract class AbstractParquetMapInspector implements SettableMapObjectInspector { @@ -57,19 +58,19 @@ public abstract class AbstractParquetMapInspector implements SettableMapObjectIn return null; } - if (data instanceof ObjectArrayWritable) { - final Object[] mapContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] mapContainer = ((ArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); - final Map<Object, Object> map = new LinkedHashMap<Object, Object>(); + final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); + final Map<Writable, Writable> map = new LinkedHashMap<Writable, Writable>(); - for (final Object obj : mapArray) { - final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; - final Object[] arr = mapObj.get(); + for (final Writable obj : mapArray) { + final ArrayWritable mapObj = (ArrayWritable) obj; + final Writable[] arr = mapObj.get(); map.put(arr[0], arr[1]); } @@ -89,13 +90,13 @@ public abstract class AbstractParquetMapInspector implements SettableMapObjectIn return -1; } - if (data instanceof ObjectArrayWritable) { - final Object[] mapContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] mapContainer = ((ArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return -1; } else { - return ((ObjectArrayWritable) mapContainer[0]).get().length; + return ((ArrayWritable) mapContainer[0]).get().length; } } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java new file mode 100644 index 0000000..6091882 --- /dev/null +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ArrayWritableObjectInspector.java @@ -0,0 +1,249 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.parquet.serde; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; + +import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; +import org.apache.hadoop.hive.serde.serdeConstants; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; +import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; +import org.apache.hadoop.io.ArrayWritable; + +/** + * + * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive struct.<br /> + * It can also inspect a List if Hive decides to inspect the result of an inspection. + * + */ +public class ArrayWritableObjectInspector extends SettableStructObjectInspector { + + private final TypeInfo typeInfo; + private final List<TypeInfo> fieldInfos; + private final List<String> fieldNames; + private final List<StructField> fields; + private final HashMap<String, StructFieldImpl> fieldsByName; + + public ArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { + + typeInfo = rowTypeInfo; + fieldNames = rowTypeInfo.getAllStructFieldNames(); + fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); + fields = new ArrayList<StructField>(fieldNames.size()); + fieldsByName = new HashMap<String, StructFieldImpl>(); + + for (int i = 0; i < fieldNames.size(); ++i) { + final String name = fieldNames.get(i); + final TypeInfo fieldInfo = fieldInfos.get(i); + + final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); + fields.add(field); + fieldsByName.put(name, field); + } + } + + private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { + if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDoubleObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableBooleanObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableFloatObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableIntObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableLongObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetStringInspector; + } else if (typeInfo instanceof DecimalTypeInfo) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((DecimalTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.STRUCT)) { + return new ArrayWritableObjectInspector((StructTypeInfo) typeInfo); + } else if (typeInfo.getCategory().equals(Category.LIST)) { + final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); + return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); + } else if (typeInfo.getCategory().equals(Category.MAP)) { + final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); + final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); + if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.byteTypeInfo) + || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); + } else { + return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); + } + } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetByteInspector; + } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { + return ParquetPrimitiveInspectorFactory.parquetShortInspector; + } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; + } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)){ + return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; + }else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { + return PrimitiveObjectInspectorFactory.writableDateObjectInspector; + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) typeInfo); + } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { + return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((VarcharTypeInfo) typeInfo); + } else { + throw new UnsupportedOperationException("Unknown field type: " + typeInfo); + } + + } + + @Override + public Category getCategory() { + return Category.STRUCT; + } + + @Override + public String getTypeName() { + return typeInfo.getTypeName(); + } + + @Override + public List<? extends StructField> getAllStructFieldRefs() { + return fields; + } + + @Override + public Object getStructFieldData(final Object data, final StructField fieldRef) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final ArrayWritable arr = (ArrayWritable) data; + return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; + } + + //since setStructFieldData and create return a list, getStructFieldData should be able to + //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde + //is something else. + if (data instanceof List) { + return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public StructField getStructFieldRef(final String name) { + return fieldsByName.get(name); + } + + @Override + public List<Object> getStructFieldsDataAsList(final Object data) { + if (data == null) { + return null; + } + + if (data instanceof ArrayWritable) { + final ArrayWritable arr = (ArrayWritable) data; + final Object[] arrWritable = arr.get(); + return new ArrayList<Object>(Arrays.asList(arrWritable)); + } + + throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); + } + + @Override + public Object create() { + final ArrayList<Object> list = new ArrayList<Object>(fields.size()); + for (int i = 0; i < fields.size(); ++i) { + list.add(null); + } + return list; + } + + @Override + public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { + final ArrayList<Object> list = (ArrayList<Object>) struct; + list.set(((StructFieldImpl) field).getIndex(), fieldValue); + return list; + } + + @Override + public boolean equals(Object obj) { + if (obj == null) { + return false; + } + if (getClass() != obj.getClass()) { + return false; + } + final ArrayWritableObjectInspector other = (ArrayWritableObjectInspector) obj; + if (this.typeInfo != other.typeInfo && (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { + return false; + } + return true; + } + + @Override + public int hashCode() { + int hash = 5; + hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); + return hash; + } + + class StructFieldImpl implements StructField { + + private final String name; + private final ObjectInspector inspector; + private final int index; + + public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { + this.name = name; + this.inspector = inspector; + this.index = index; + } + + @Override + public String getFieldComment() { + return ""; + } + + @Override + public String getFieldName() { + return name; + } + + public int getIndex() { + return index; + } + + @Override + public ObjectInspector getFieldObjectInspector() { + return inspector; + } + + @Override + public int getFieldID() { + return index; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java index 80fdb22..d38c641 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/DeepParquetHiveMapInspector.java @@ -15,9 +15,10 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; /** * The DeepParquetHiveMapInspector will inspect an ArrayWritable, considering it as a Hive map.<br /> @@ -38,18 +39,18 @@ public class DeepParquetHiveMapInspector extends AbstractParquetMapInspector { return null; } - if (data instanceof ObjectArrayWritable) { - final Object[] mapContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] mapContainer = ((ArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); + final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); - for (final Object obj : mapArray) { - final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; - final Object[] arr = mapObj.get(); + for (final Writable obj : mapArray) { + final ArrayWritable mapObj = (ArrayWritable) obj; + final Writable[] arr = mapObj.get(); if (key.equals(arr[0]) || key.equals(((PrimitiveObjectInspector) keyInspector).getPrimitiveJavaObject(arr[0])) || key.equals(((PrimitiveObjectInspector) keyInspector).getPrimitiveWritableObject(arr[0]))) { return arr[1]; http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java deleted file mode 100644 index 571f993..0000000 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ObjectArrayWritableObjectInspector.java +++ /dev/null @@ -1,248 +0,0 @@ -/** - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hive.ql.io.parquet.serde; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; - -import org.apache.hadoop.hive.ql.io.parquet.serde.primitive.ParquetPrimitiveInspectorFactory; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.SettableStructObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory; -import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.ListTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.MapTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; -import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; -import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; - -/** - * - * The ArrayWritableObjectInspector will inspect an ArrayWritable, considering it as a Hive struct.<br /> - * It can also inspect a List if Hive decides to inspect the result of an inspection. - * - */ -public class ObjectArrayWritableObjectInspector extends SettableStructObjectInspector { - - private final TypeInfo typeInfo; - private final List<TypeInfo> fieldInfos; - private final List<String> fieldNames; - private final List<StructField> fields; - private final HashMap<String, StructFieldImpl> fieldsByName; - - public ObjectArrayWritableObjectInspector(final StructTypeInfo rowTypeInfo) { - - typeInfo = rowTypeInfo; - fieldNames = rowTypeInfo.getAllStructFieldNames(); - fieldInfos = rowTypeInfo.getAllStructFieldTypeInfos(); - fields = new ArrayList<StructField>(fieldNames.size()); - fieldsByName = new HashMap<String, StructFieldImpl>(); - - for (int i = 0; i < fieldNames.size(); ++i) { - final String name = fieldNames.get(i); - final TypeInfo fieldInfo = fieldInfos.get(i); - - final StructFieldImpl field = new StructFieldImpl(name, getObjectInspector(fieldInfo), i); - fields.add(field); - fieldsByName.put(name, field); - } - } - - private ObjectInspector getObjectInspector(final TypeInfo typeInfo) { - if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) { - return PrimitiveObjectInspectorFactory.javaDoubleObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) { - return PrimitiveObjectInspectorFactory.javaBooleanObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) { - return PrimitiveObjectInspectorFactory.javaFloatObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo)) { - return PrimitiveObjectInspectorFactory.javaIntObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) { - return PrimitiveObjectInspectorFactory.javaLongObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetStringInspector; - } else if (typeInfo instanceof DecimalTypeInfo) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((DecimalTypeInfo) typeInfo); - } else if (typeInfo.getCategory().equals(Category.STRUCT)) { - return new ObjectArrayWritableObjectInspector((StructTypeInfo) typeInfo); - } else if (typeInfo.getCategory().equals(Category.LIST)) { - final TypeInfo subTypeInfo = ((ListTypeInfo) typeInfo).getListElementTypeInfo(); - return new ParquetHiveArrayInspector(getObjectInspector(subTypeInfo)); - } else if (typeInfo.getCategory().equals(Category.MAP)) { - final TypeInfo keyTypeInfo = ((MapTypeInfo) typeInfo).getMapKeyTypeInfo(); - final TypeInfo valueTypeInfo = ((MapTypeInfo) typeInfo).getMapValueTypeInfo(); - if (keyTypeInfo.equals(TypeInfoFactory.stringTypeInfo) || keyTypeInfo.equals(TypeInfoFactory.byteTypeInfo) - || keyTypeInfo.equals(TypeInfoFactory.shortTypeInfo)) { - return new DeepParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); - } else { - return new StandardParquetHiveMapInspector(getObjectInspector(keyTypeInfo), getObjectInspector(valueTypeInfo)); - } - } else if (typeInfo.equals(TypeInfoFactory.byteTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetByteInspector; - } else if (typeInfo.equals(TypeInfoFactory.shortTypeInfo)) { - return ParquetPrimitiveInspectorFactory.parquetShortInspector; - } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableTimestampObjectInspector; - } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)){ - return PrimitiveObjectInspectorFactory.writableBinaryObjectInspector; - }else if (typeInfo.equals(TypeInfoFactory.dateTypeInfo)) { - return PrimitiveObjectInspectorFactory.writableDateObjectInspector; - } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.CHAR_TYPE_NAME)) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((CharTypeInfo) typeInfo); - } else if (typeInfo.getTypeName().toLowerCase().startsWith(serdeConstants.VARCHAR_TYPE_NAME)) { - return PrimitiveObjectInspectorFactory.getPrimitiveWritableObjectInspector((VarcharTypeInfo) typeInfo); - } else { - throw new UnsupportedOperationException("Unknown field type: " + typeInfo); - } - - } - - @Override - public Category getCategory() { - return Category.STRUCT; - } - - @Override - public String getTypeName() { - return typeInfo.getTypeName(); - } - - @Override - public List<? extends StructField> getAllStructFieldRefs() { - return fields; - } - - @Override - public Object getStructFieldData(final Object data, final StructField fieldRef) { - if (data == null) { - return null; - } - - if (data instanceof ObjectArrayWritable) { - final ObjectArrayWritable arr = (ObjectArrayWritable) data; - return arr.get()[((StructFieldImpl) fieldRef).getIndex()]; - } - - //since setStructFieldData and create return a list, getStructFieldData should be able to - //handle list data. This is required when table serde is ParquetHiveSerDe and partition serde - //is something else. - if (data instanceof List) { - return ((List) data).get(((StructFieldImpl) fieldRef).getIndex()); - } - - throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); - } - - @Override - public StructField getStructFieldRef(final String name) { - return fieldsByName.get(name); - } - - @Override - public List<Object> getStructFieldsDataAsList(final Object data) { - if (data == null) { - return null; - } - - if (data instanceof ObjectArrayWritable) { - final ObjectArrayWritable arr = (ObjectArrayWritable) data; - final Object[] arrObjects = arr.get(); - return Arrays.asList(arrObjects); - } - - throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); - } - - @Override - public Object create() { - final ArrayList<Object> list = new ArrayList<Object>(fields.size()); - for (int i = 0; i < fields.size(); ++i) { - list.add(null); - } - return list; - } - - @Override - public Object setStructFieldData(Object struct, StructField field, Object fieldValue) { - final ArrayList<Object> list = (ArrayList<Object>) struct; - list.set(((StructFieldImpl) field).getIndex(), fieldValue); - return list; - } - - @Override - public boolean equals(Object obj) { - if (obj == null) { - return false; - } - if (getClass() != obj.getClass()) { - return false; - } - final ObjectArrayWritableObjectInspector other = (ObjectArrayWritableObjectInspector) obj; - if (this.typeInfo != other.typeInfo && (this.typeInfo == null || !this.typeInfo.equals(other.typeInfo))) { - return false; - } - return true; - } - - @Override - public int hashCode() { - int hash = 5; - hash = 29 * hash + (this.typeInfo != null ? this.typeInfo.hashCode() : 0); - return hash; - } - - class StructFieldImpl implements StructField { - - private final String name; - private final ObjectInspector inspector; - private final int index; - - public StructFieldImpl(final String name, final ObjectInspector inspector, final int index) { - this.name = name; - this.inspector = inspector; - this.index = index; - } - - @Override - public String getFieldComment() { - return ""; - } - - @Override - public String getFieldName() { - return name; - } - - public int getIndex() { - return index; - } - - @Override - public ObjectInspector getFieldObjectInspector() { - return inspector; - } - - @Override - public int getFieldID() { - return index; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java index e276359..53ca31d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveArrayInspector.java @@ -16,11 +16,10 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.ArrayList; import java.util.List; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.SettableListObjectInspector; - -import java.util.Arrays; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; /** * The ParquetHiveArrayInspector will inspect an ArrayWritable, considering it as an Hive array.<br /> @@ -56,21 +55,21 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return null; } - if (data instanceof ObjectArrayWritable) { - final Object[] listContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return null; } - final Object subObj = listContainer[0]; + final Writable subObj = listContainer[0]; if (subObj == null) { return null; } - if (index >= 0 && index < ((ObjectArrayWritable) subObj).get().length) { - return ((ObjectArrayWritable) subObj).get()[index]; + if (index >= 0 && index < ((ArrayWritable) subObj).get().length) { + return ((ArrayWritable) subObj).get()[index]; } else { return null; } @@ -85,20 +84,20 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return -1; } - if (data instanceof ObjectArrayWritable) { - final Object[] listContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return -1; } - final Object subObj = listContainer[0]; + final Writable subObj = listContainer[0]; if (subObj == null) { return 0; } - return ((ObjectArrayWritable) subObj).get().length; + return ((ArrayWritable) subObj).get().length; } throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); @@ -110,23 +109,23 @@ public class ParquetHiveArrayInspector implements SettableListObjectInspector { return null; } - if (data instanceof ObjectArrayWritable) { - final Object[] listContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] listContainer = ((ArrayWritable) data).get(); if (listContainer == null || listContainer.length == 0) { return null; } - final Object subObj = listContainer[0]; + final Writable subObj = listContainer[0]; if (subObj == null) { return null; } - final Object[] array = ((ObjectArrayWritable) subObj).get(); - final List<Object> list = new ArrayList<Object>(); + final Writable[] array = ((ArrayWritable) subObj).get(); + final List<Writable> list = new ArrayList<Writable>(); - for (final Object obj : array) { + for (final Writable obj : array) { list.add(obj); } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java index dc4f896..7fd5e96 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/ParquetHiveSerDe.java @@ -24,7 +24,6 @@ import org.apache.hadoop.hive.serde2.AbstractSerDe; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.hadoop.hive.serde2.SerDeSpec; import org.apache.hadoop.hive.serde2.SerDeStats; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; @@ -34,6 +33,7 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoFactory; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.io.Writable; import parquet.hadoop.ParquetOutputFormat; @@ -118,7 +118,7 @@ public class ParquetHiveSerDe extends AbstractSerDe { } // Create row related objects rowTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes); - this.objInspector = new ObjectArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); + this.objInspector = new ArrayWritableObjectInspector((StructTypeInfo) rowTypeInfo); // Stats part serializedSize = 0; @@ -130,8 +130,8 @@ public class ParquetHiveSerDe extends AbstractSerDe { public Object deserialize(final Writable blob) throws SerDeException { status = LAST_OPERATION.DESERIALIZE; deserializedSize = 0; - if (blob instanceof ObjectArrayWritable) { - deserializedSize = ((ObjectArrayWritable) blob).get().length; + if (blob instanceof ArrayWritable) { + deserializedSize = ((ArrayWritable) blob).get().length; return blob; } else { return null; http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java index 0ee7e2c..5aa1448 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/StandardParquetHiveMapInspector.java @@ -15,8 +15,9 @@ package org.apache.hadoop.hive.ql.io.parquet.serde; import java.util.Map; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.io.ArrayWritable; +import org.apache.hadoop.io.Writable; /** * The StandardParquetHiveMapInspector will inspect an ArrayWritable, considering it as a Hive map.<br /> @@ -35,16 +36,16 @@ public class StandardParquetHiveMapInspector extends AbstractParquetMapInspector if (data == null || key == null) { return null; } - if (data instanceof ObjectArrayWritable) { - final Object[] mapContainer = ((ObjectArrayWritable) data).get(); + if (data instanceof ArrayWritable) { + final Writable[] mapContainer = ((ArrayWritable) data).get(); if (mapContainer == null || mapContainer.length == 0) { return null; } - final Object[] mapArray = ((ObjectArrayWritable) mapContainer[0]).get(); - for (final Object obj : mapArray) { - final ObjectArrayWritable mapObj = (ObjectArrayWritable) obj; - final Object[] arr = mapObj.get(); + final Writable[] mapArray = ((ArrayWritable) mapContainer[0]).get(); + for (final Writable obj : mapArray) { + final ArrayWritable mapObj = (ArrayWritable) obj; + final Writable[] arr = mapObj.get(); if (key.equals(arr[0])) { return arr[1]; } http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java index b31f85c..864f562 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetByteInspector.java @@ -52,10 +52,9 @@ public class ParquetByteInspector extends AbstractPrimitiveJavaObjectInspector i @Override public byte get(Object o) { + // Accept int writables and convert them. if (o instanceof IntWritable) { return (byte) ((IntWritable) o).get(); - } else if (o instanceof Integer) { - return ((Integer) o).byteValue(); } return ((ByteWritable) o).get(); http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java index 0acf350..39f2657 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/serde/primitive/ParquetShortInspector.java @@ -55,8 +55,6 @@ public class ParquetShortInspector extends AbstractPrimitiveJavaObjectInspector // Accept int writables and convert them. if (o instanceof IntWritable) { return (short) ((IntWritable) o).get(); - } else if (o instanceof Integer) { - return ((Integer) o).shortValue(); } return ((ShortWritable) o).get(); http://git-wip-us.apache.org/repos/asf/hive/blob/82c037af/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java index 9c4cf5c..94a780d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/parquet/AbstractTestParquetDirect.java @@ -32,7 +32,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.io.ObjectArrayWritable; +import org.apache.hadoop.io.ArrayWritable; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.FileSplit; @@ -109,45 +109,45 @@ public abstract class AbstractTestParquetDirect { return path; } - public static ObjectArrayWritable record(Object... fields) { - return new ObjectArrayWritable(fields); + public static ArrayWritable record(Writable... fields) { + return new ArrayWritable(Writable.class, fields); } - public static ObjectArrayWritable list(Object... elements) { + public static ArrayWritable list(Writable... elements) { // the ObjectInspector for array<?> and map<?, ?> expects an extra layer - return new ObjectArrayWritable(new Object[] { - new ObjectArrayWritable(elements) + return new ArrayWritable(ArrayWritable.class, new ArrayWritable[] { + new ArrayWritable(Writable.class, elements) }); } - public static String toString(ObjectArrayWritable arrayWritable) { - Object[] elements = arrayWritable.get(); - String[] strings = new String[elements.length]; - for (int i = 0; i < elements.length; i += 1) { - if (elements[i] instanceof ObjectArrayWritable) { - strings[i] = toString((ObjectArrayWritable) elements[i]); + public static String toString(ArrayWritable arrayWritable) { + Writable[] writables = arrayWritable.get(); + String[] strings = new String[writables.length]; + for (int i = 0; i < writables.length; i += 1) { + if (writables[i] instanceof ArrayWritable) { + strings[i] = toString((ArrayWritable) writables[i]); } else { - strings[i] = String.valueOf(elements[i]); + strings[i] = String.valueOf(writables[i]); } } return Arrays.toString(strings); } - public static void assertEquals(String message, ObjectArrayWritable expected, - ObjectArrayWritable actual) { + public static void assertEquals(String message, ArrayWritable expected, + ArrayWritable actual) { Assert.assertEquals(message, toString(expected), toString(actual)); } - public static List<ObjectArrayWritable> read(Path parquetFile) throws IOException { - List<ObjectArrayWritable> records = new ArrayList<ObjectArrayWritable>(); + public static List<ArrayWritable> read(Path parquetFile) throws IOException { + List<ArrayWritable> records = new ArrayList<ArrayWritable>(); - RecordReader<NullWritable, ObjectArrayWritable> reader = new MapredParquetInputFormat(). + RecordReader<NullWritable, ArrayWritable> reader = new MapredParquetInputFormat(). getRecordReader(new FileSplit( parquetFile, 0, fileLength(parquetFile), (String[]) null), new JobConf(), null); NullWritable alwaysNull = reader.createKey(); - ObjectArrayWritable record = reader.createValue(); + ArrayWritable record = reader.createValue(); while (reader.next(alwaysNull, record)) { records.add(record); record = reader.createValue(); // a new value so the last isn't clobbered