HIVE-11131: Get row information on DataWritableWriter once for better writing performance (Sergio Pena, reviewed by Ferdinand Xu, Dong Chen & Ryan Blue)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/82074894 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/82074894 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/82074894 Branch: refs/heads/beeline-cli Commit: 8207489451debe98051898df6b1ec0833fc80bb3 Parents: a2dabcb Author: Sergio Pena <sergio.p...@cloudera.com> Authored: Thu Jul 9 16:37:10 2015 -0500 Committer: Sergio Pena <sergio.p...@cloudera.com> Committed: Thu Jul 9 16:37:10 2015 -0500 ---------------------------------------------------------------------- .../ql/io/parquet/write/DataWritableWriter.java | 638 +++++++++++++------ 1 file changed, 426 insertions(+), 212 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/82074894/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java index c195c3e..493cd36 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/write/DataWritableWriter.java @@ -20,8 +20,26 @@ import org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe; import org.apache.hadoop.hive.ql.io.parquet.timestamp.NanoTimeUtils; import org.apache.hadoop.hive.serde2.io.DateWritable; import org.apache.hadoop.hive.serde2.io.ParquetHiveRecord; -import org.apache.hadoop.hive.serde2.objectinspector.*; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.*; +import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BinaryObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.BooleanObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ByteObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DateObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.DoubleObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.FloatObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveCharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveDecimalObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.HiveVarcharObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.IntObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.LongObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.ShortObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.TimestampObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.parquet.io.api.Binary; import org.apache.parquet.io.api.RecordConsumer; @@ -36,87 +54,104 @@ import java.util.Map; /** * - * DataWritableWriter is a writer that reads a ParquetWritable object and send the data to the Parquet - * API with the expected schema. This class is only used through DataWritableWriteSupport class. + * DataWritableWriter sends a record to the Parquet API with the expected schema in order + * to be written to a file. + * This class is only used through DataWritableWriteSupport class. */ public class DataWritableWriter { private static final Log LOG = LogFactory.getLog(DataWritableWriter.class); - private final RecordConsumer recordConsumer; + protected final RecordConsumer recordConsumer; private final GroupType schema; + /* This writer will be created when writing the first row in order to get + information about how to inspect the record data. */ + private DataWriter messageWriter; + public DataWritableWriter(final RecordConsumer recordConsumer, final GroupType schema) { this.recordConsumer = recordConsumer; this.schema = schema; } /** - * It writes all record values to the Parquet RecordConsumer. - * @param record Contains the record that are going to be written. + * It writes a record to Parquet. + * @param record Contains the record that is going to be written. */ public void write(final ParquetHiveRecord record) { if (record != null) { - recordConsumer.startMessage(); - try { - writeGroupFields(record.getObject(), record.getObjectInspector(), schema); - } catch (RuntimeException e) { - String errorMessage = "Parquet record is malformed: " + e.getMessage(); - LOG.error(errorMessage, e); - throw new RuntimeException(errorMessage, e); + if (messageWriter == null) { + try { + messageWriter = createMessageWriter(record.getObjectInspector(), schema); + } catch (RuntimeException e) { + String errorMessage = "Parquet record is malformed: " + e.getMessage(); + LOG.error(errorMessage, e); + throw new RuntimeException(errorMessage, e); + } } - recordConsumer.endMessage(); + + messageWriter.write(record.getObject()); } } - /** - * It writes all the fields contained inside a group to the RecordConsumer. - * @param value The list of values contained in the group. - * @param inspector The object inspector used to get the correct value type. - * @param type Type that contains information about the group schema. - */ - private void writeGroupFields(final Object value, final StructObjectInspector inspector, final GroupType type) { - if (value != null) { - List<? extends StructField> fields = inspector.getAllStructFieldRefs(); - List<Object> fieldValuesList = inspector.getStructFieldsDataAsList(value); - - for (int i = 0; i < type.getFieldCount(); i++) { - Type fieldType = type.getType(i); - String fieldName = fieldType.getName(); - Object fieldValue = fieldValuesList.get(i); - - if (fieldValue != null) { - ObjectInspector fieldInspector = fields.get(i).getFieldObjectInspector(); - recordConsumer.startField(fieldName, i); - writeValue(fieldValue, fieldInspector, fieldType); - recordConsumer.endField(fieldName, i); - } - } - } + private MessageDataWriter createMessageWriter(StructObjectInspector inspector, GroupType schema) { + return new MessageDataWriter(inspector, schema); } /** - * It writes the field value to the Parquet RecordConsumer. It detects the field type, and calls - * the correct write function. - * @param value The writable object that contains the value. + * Creates a writer for the specific object inspector. The returned writer will be used + * to call Parquet API for the specific data type. * @param inspector The object inspector used to get the correct value type. * @param type Type that contains information about the type schema. + * @return A ParquetWriter object used to call the Parquet API fo the specific data type. */ - private void writeValue(final Object value, final ObjectInspector inspector, final Type type) { + private DataWriter createWriter(ObjectInspector inspector, Type type) { if (type.isPrimitive()) { checkInspectorCategory(inspector, ObjectInspector.Category.PRIMITIVE); - writePrimitive(value, (PrimitiveObjectInspector)inspector); + PrimitiveObjectInspector primitiveInspector = (PrimitiveObjectInspector)inspector; + switch (primitiveInspector.getPrimitiveCategory()) { + case BOOLEAN: + return new BooleanDataWriter((BooleanObjectInspector)inspector); + case BYTE: + return new ByteDataWriter((ByteObjectInspector)inspector); + case SHORT: + return new ShortDataWriter((ShortObjectInspector)inspector); + case INT: + return new IntDataWriter((IntObjectInspector)inspector); + case LONG: + return new LongDataWriter((LongObjectInspector)inspector); + case FLOAT: + return new FloatDataWriter((FloatObjectInspector)inspector); + case DOUBLE: + return new DoubleDataWriter((DoubleObjectInspector)inspector); + case STRING: + return new StringDataWriter((StringObjectInspector)inspector); + case CHAR: + return new CharDataWriter((HiveCharObjectInspector)inspector); + case VARCHAR: + return new VarcharDataWriter((HiveVarcharObjectInspector)inspector); + case BINARY: + return new BinaryDataWriter((BinaryObjectInspector)inspector); + case TIMESTAMP: + return new TimestampDataWriter((TimestampObjectInspector)inspector); + case DECIMAL: + return new DecimalDataWriter((HiveDecimalObjectInspector)inspector); + case DATE: + return new DateDataWriter((DateObjectInspector)inspector); + default: + throw new IllegalArgumentException("Unsupported primitive data type: " + primitiveInspector.getPrimitiveCategory()); + } } else { GroupType groupType = type.asGroupType(); OriginalType originalType = type.getOriginalType(); if (originalType != null && originalType.equals(OriginalType.LIST)) { checkInspectorCategory(inspector, ObjectInspector.Category.LIST); - writeArray(value, (ListObjectInspector)inspector, groupType); + return new ListDataWriter((ListObjectInspector)inspector, groupType); } else if (originalType != null && originalType.equals(OriginalType.MAP)) { checkInspectorCategory(inspector, ObjectInspector.Category.MAP); - writeMap(value, (MapObjectInspector)inspector, groupType); + return new MapDataWriter((MapObjectInspector)inspector, groupType); } else { checkInspectorCategory(inspector, ObjectInspector.Category.STRUCT); - writeGroup(value, (StructObjectInspector)inspector, groupType); + return new StructDataWriter((StructObjectInspector)inspector, groupType); } } } @@ -134,203 +169,382 @@ public class DataWritableWriter { } } - /** - * It writes a group type and all its values to the Parquet RecordConsumer. - * This is used only for optional and required groups. - * @param value Object that contains the group values. - * @param inspector The object inspector used to get the correct value type. - * @param type Type that contains information about the group schema. - */ - private void writeGroup(final Object value, final StructObjectInspector inspector, final GroupType type) { - recordConsumer.startGroup(); - writeGroupFields(value, inspector, type); - recordConsumer.endGroup(); + private interface DataWriter { + void write(Object value); } - /** - * It writes a list type and its array elements to the Parquet RecordConsumer. - * This is called when the original type (LIST) is detected by writeValue()/ - * This function assumes the following schema: - * optional group arrayCol (LIST) { - * repeated group array { - * optional TYPE array_element; - * } - * } - * @param value The object that contains the array values. - * @param inspector The object inspector used to get the correct value type. - * @param type Type that contains information about the group (LIST) schema. - */ - private void writeArray(final Object value, final ListObjectInspector inspector, final GroupType type) { - // Get the internal array structure - GroupType repeatedType = type.getType(0).asGroupType(); + private class GroupDataWriter implements DataWriter { + private StructObjectInspector inspector; + private List<? extends StructField> structFields; + private DataWriter[] structWriters; - recordConsumer.startGroup(); - recordConsumer.startField(repeatedType.getName(), 0); + public GroupDataWriter(StructObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; - List<?> arrayValues = inspector.getList(value); - ObjectInspector elementInspector = inspector.getListElementObjectInspector(); + structFields = this.inspector.getAllStructFieldRefs(); + structWriters = new DataWriter[structFields.size()]; - Type elementType = repeatedType.getType(0); - String elementName = elementType.getName(); + for (int i = 0; i < structFields.size(); i++) { + StructField field = structFields.get(i); + structWriters[i] = createWriter(field.getFieldObjectInspector(), groupType.getType(i)); + } + } - for (Object element : arrayValues) { - recordConsumer.startGroup(); - if (element != null) { - recordConsumer.startField(elementName, 0); - writeValue(element, elementInspector, elementType); - recordConsumer.endField(elementName, 0); + @Override + public void write(Object value) { + for (int i = 0; i < structFields.size(); i++) { + StructField field = structFields.get(i); + Object fieldValue = inspector.getStructFieldData(value, field); + + if (fieldValue != null) { + String fieldName = field.getFieldName(); + DataWriter writer = structWriters[i]; + + recordConsumer.startField(fieldName, i); + writer.write(fieldValue); + recordConsumer.endField(fieldName, i); + } } - recordConsumer.endGroup(); + } + } + + private class MessageDataWriter extends GroupDataWriter implements DataWriter { + public MessageDataWriter(StructObjectInspector inspector, GroupType groupType) { + super(inspector, groupType); } - recordConsumer.endField(repeatedType.getName(), 0); - recordConsumer.endGroup(); + @Override + public void write(Object value) { + recordConsumer.startMessage(); + if (value != null) { + super.write(value); + } + recordConsumer.endMessage(); + } } - /** - * It writes a map type and its key-pair values to the Parquet RecordConsumer. - * This is called when the original type (MAP) is detected by writeValue(). - * This function assumes the following schema: - * optional group mapCol (MAP) { - * repeated group map (MAP_KEY_VALUE) { - * required TYPE key; - * optional TYPE value; - * } - * } - * @param value The object that contains the map key-values. - * @param inspector The object inspector used to get the correct value type. - * @param type Type that contains information about the group (MAP) schema. - */ - private void writeMap(final Object value, final MapObjectInspector inspector, final GroupType type) { - // Get the internal map structure (MAP_KEY_VALUE) - GroupType repeatedType = type.getType(0).asGroupType(); + private class StructDataWriter extends GroupDataWriter implements DataWriter { + public StructDataWriter(StructObjectInspector inspector, GroupType groupType) { + super(inspector, groupType); + } + + @Override + public void write(Object value) { + recordConsumer.startGroup(); + super.write(value); + recordConsumer.endGroup(); + } + } + + private class ListDataWriter implements DataWriter { + private ListObjectInspector inspector; + private String elementName; + private DataWriter elementWriter; + private String repeatedGroupName; - recordConsumer.startGroup(); - recordConsumer.startField(repeatedType.getName(), 0); + public ListDataWriter(ListObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; + + // Get the internal array structure + GroupType repeatedType = groupType.getType(0).asGroupType(); + this.repeatedGroupName = repeatedType.getName(); + + Type elementType = repeatedType.getType(0); + this.elementName = elementType.getName(); + + ObjectInspector elementInspector = this.inspector.getListElementObjectInspector(); + this.elementWriter = createWriter(elementInspector, elementType); + } - Map<?, ?> mapValues = inspector.getMap(value); + @Override + public void write(Object value) { + recordConsumer.startGroup(); + recordConsumer.startField(repeatedGroupName, 0); + + int listLength = inspector.getListLength(value); + for (int i = 0; i < listLength; i++) { + Object element = inspector.getListElement(value, i); + recordConsumer.startGroup(); + if (element != null) { + recordConsumer.startField(elementName, 0); + elementWriter.write(element); + recordConsumer.endField(elementName, 0); + } + recordConsumer.endGroup(); + } - Type keyType = repeatedType.getType(0); - String keyName = keyType.getName(); - ObjectInspector keyInspector = inspector.getMapKeyObjectInspector(); + recordConsumer.endField(repeatedGroupName, 0); + recordConsumer.endGroup(); + } + } - Type valuetype = repeatedType.getType(1); - String valueName = valuetype.getName(); - ObjectInspector valueInspector = inspector.getMapValueObjectInspector(); + private class MapDataWriter implements DataWriter { + private MapObjectInspector inspector; + private String repeatedGroupName; + private String keyName, valueName; + private DataWriter keyWriter, valueWriter; + + public MapDataWriter(MapObjectInspector inspector, GroupType groupType) { + this.inspector = inspector; + + // Get the internal map structure (MAP_KEY_VALUE) + GroupType repeatedType = groupType.getType(0).asGroupType(); + this.repeatedGroupName = repeatedType.getName(); + + // Get key element information + Type keyType = repeatedType.getType(0); + ObjectInspector keyInspector = this.inspector.getMapKeyObjectInspector(); + this.keyName = keyType.getName(); + this.keyWriter = createWriter(keyInspector, keyType); + + // Get value element information + Type valuetype = repeatedType.getType(1); + ObjectInspector valueInspector = this.inspector.getMapValueObjectInspector(); + this.valueName = valuetype.getName(); + this.valueWriter = createWriter(valueInspector, valuetype); + } - for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) { + @Override + public void write(Object value) { recordConsumer.startGroup(); - if (keyValue != null) { - // write key element - Object keyElement = keyValue.getKey(); - recordConsumer.startField(keyName, 0); - writeValue(keyElement, keyInspector, keyType); - recordConsumer.endField(keyName, 0); - - // write value element - Object valueElement = keyValue.getValue(); - if (valueElement != null) { - recordConsumer.startField(valueName, 1); - writeValue(valueElement, valueInspector, valuetype); - recordConsumer.endField(valueName, 1); + recordConsumer.startField(repeatedGroupName, 0); + + Map<?, ?> mapValues = inspector.getMap(value); + for (Map.Entry<?, ?> keyValue : mapValues.entrySet()) { + recordConsumer.startGroup(); + if (keyValue != null) { + // write key element + Object keyElement = keyValue.getKey(); + recordConsumer.startField(keyName, 0); + keyWriter.write(keyElement); + recordConsumer.endField(keyName, 0); + + // write value element + Object valueElement = keyValue.getValue(); + if (valueElement != null) { + recordConsumer.startField(valueName, 1); + valueWriter.write(valueElement); + recordConsumer.endField(valueName, 1); + } } + recordConsumer.endGroup(); } + + recordConsumer.endField(repeatedGroupName, 0); recordConsumer.endGroup(); } + } + + private class BooleanDataWriter implements DataWriter { + private BooleanObjectInspector inspector; + + public BooleanDataWriter(BooleanObjectInspector inspector) { + this.inspector = inspector; + } - recordConsumer.endField(repeatedType.getName(), 0); - recordConsumer.endGroup(); + @Override + public void write(Object value) { + recordConsumer.addBoolean(inspector.get(value)); + } } - /** - * It writes the primitive value to the Parquet RecordConsumer. - * @param value The object that contains the primitive value. - * @param inspector The object inspector used to get the correct value type. - */ - private void writePrimitive(final Object value, final PrimitiveObjectInspector inspector) { - if (value == null) { - return; - } - - switch (inspector.getPrimitiveCategory()) { - case VOID: - return; - case DOUBLE: - recordConsumer.addDouble(((DoubleObjectInspector) inspector).get(value)); - break; - case BOOLEAN: - recordConsumer.addBoolean(((BooleanObjectInspector) inspector).get(value)); - break; - case FLOAT: - recordConsumer.addFloat(((FloatObjectInspector) inspector).get(value)); - break; - case BYTE: - recordConsumer.addInteger(((ByteObjectInspector) inspector).get(value)); - break; - case INT: - recordConsumer.addInteger(((IntObjectInspector) inspector).get(value)); - break; - case LONG: - recordConsumer.addLong(((LongObjectInspector) inspector).get(value)); - break; - case SHORT: - recordConsumer.addInteger(((ShortObjectInspector) inspector).get(value)); - break; - case STRING: - String v = ((StringObjectInspector) inspector).getPrimitiveJavaObject(value); - recordConsumer.addBinary(Binary.fromString(v)); - break; - case CHAR: - String vChar = ((HiveCharObjectInspector) inspector).getPrimitiveJavaObject(value).getStrippedValue(); - recordConsumer.addBinary(Binary.fromString(vChar)); - break; - case VARCHAR: - String vVarchar = ((HiveVarcharObjectInspector) inspector).getPrimitiveJavaObject(value).getValue(); - recordConsumer.addBinary(Binary.fromString(vVarchar)); - break; - case BINARY: - byte[] vBinary = ((BinaryObjectInspector) inspector).getPrimitiveJavaObject(value); - recordConsumer.addBinary(Binary.fromByteArray(vBinary)); - break; - case TIMESTAMP: - Timestamp ts = ((TimestampObjectInspector) inspector).getPrimitiveJavaObject(value); - recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); - break; - case DECIMAL: - HiveDecimal vDecimal = ((HiveDecimal)inspector.getPrimitiveJavaObject(value)); - DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo(); - recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); - break; - case DATE: - Date vDate = ((DateObjectInspector) inspector).getPrimitiveJavaObject(value); - recordConsumer.addInteger(DateWritable.dateToDays(vDate)); - break; - default: - throw new IllegalArgumentException("Unsupported primitive data type: " + inspector.getPrimitiveCategory()); + private class ByteDataWriter implements DataWriter { + private ByteObjectInspector inspector; + + public ByteDataWriter(ByteObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class ShortDataWriter implements DataWriter { + private ShortObjectInspector inspector; + public ShortDataWriter(ShortObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class IntDataWriter implements DataWriter { + private IntObjectInspector inspector; + + public IntDataWriter(IntObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addInteger(inspector.get(value)); + } + } + + private class LongDataWriter implements DataWriter { + private LongObjectInspector inspector; + + public LongDataWriter(LongObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addLong(inspector.get(value)); + } + } + + private class FloatDataWriter implements DataWriter { + private FloatObjectInspector inspector; + + public FloatDataWriter(FloatObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addFloat(inspector.get(value)); + } + } + + private class DoubleDataWriter implements DataWriter { + private DoubleObjectInspector inspector; + + public DoubleDataWriter(DoubleObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + recordConsumer.addDouble(inspector.get(value)); + } + } + + private class StringDataWriter implements DataWriter { + private StringObjectInspector inspector; + + public StringDataWriter(StringObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromString(v)); } } - private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) { - int prec = decimalTypeInfo.precision(); - int scale = decimalTypeInfo.scale(); - byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray(); + private class CharDataWriter implements DataWriter { + private HiveCharObjectInspector inspector; - // Estimated number of bytes needed. - int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1]; - if (precToBytes == decimalBytes.length) { - // No padding needed. - return Binary.fromByteArray(decimalBytes); + public CharDataWriter(HiveCharObjectInspector inspector) { + this.inspector = inspector; } - byte[] tgt = new byte[precToBytes]; + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value).getStrippedValue(); + recordConsumer.addBinary(Binary.fromString(v)); + } + } + + private class VarcharDataWriter implements DataWriter { + private HiveVarcharObjectInspector inspector; + + public VarcharDataWriter(HiveVarcharObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + String v = inspector.getPrimitiveJavaObject(value).getValue(); + recordConsumer.addBinary(Binary.fromString(v)); + } + } + + private class BinaryDataWriter implements DataWriter { + private BinaryObjectInspector inspector; + + public BinaryDataWriter(BinaryObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + byte[] vBinary = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(Binary.fromByteArray(vBinary)); + } + } + + private class TimestampDataWriter implements DataWriter { + private TimestampObjectInspector inspector; + + public TimestampDataWriter(TimestampObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + Timestamp ts = inspector.getPrimitiveJavaObject(value); + recordConsumer.addBinary(NanoTimeUtils.getNanoTime(ts, false).toBinary()); + } + } + + private class DecimalDataWriter implements DataWriter { + private HiveDecimalObjectInspector inspector; + + public DecimalDataWriter(HiveDecimalObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + HiveDecimal vDecimal = inspector.getPrimitiveJavaObject(value); + DecimalTypeInfo decTypeInfo = (DecimalTypeInfo)inspector.getTypeInfo(); + recordConsumer.addBinary(decimalToBinary(vDecimal, decTypeInfo)); + } + + private Binary decimalToBinary(final HiveDecimal hiveDecimal, final DecimalTypeInfo decimalTypeInfo) { + int prec = decimalTypeInfo.precision(); + int scale = decimalTypeInfo.scale(); + byte[] decimalBytes = hiveDecimal.setScale(scale).unscaledValue().toByteArray(); + + // Estimated number of bytes needed. + int precToBytes = ParquetHiveSerDe.PRECISION_TO_BYTE_COUNT[prec - 1]; + if (precToBytes == decimalBytes.length) { + // No padding needed. + return Binary.fromByteArray(decimalBytes); + } + + byte[] tgt = new byte[precToBytes]; if (hiveDecimal.signum() == -1) { - // For negative number, initializing bits to 1 - for (int i = 0; i < precToBytes; i++) { - tgt[i] |= 0xFF; + // For negative number, initializing bits to 1 + for (int i = 0; i < precToBytes; i++) { + tgt[i] |= 0xFF; + } } + + System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. + return Binary.fromByteArray(tgt); } + } - System.arraycopy(decimalBytes, 0, tgt, precToBytes - decimalBytes.length, decimalBytes.length); // Padding leading zeroes/ones. - return Binary.fromByteArray(tgt); + private class DateDataWriter implements DataWriter { + private DateObjectInspector inspector; + + public DateDataWriter(DateObjectInspector inspector) { + this.inspector = inspector; + } + + @Override + public void write(Object value) { + Date vDate = inspector.getPrimitiveJavaObject(value); + recordConsumer.addInteger(DateWritable.dateToDays(vDate)); + } } -} +} \ No newline at end of file